Co-partitioning Kafka-Tables in Faust Application

preview_player
Показать описание
When managing stream partitions and their corresponding changelog partitions, "co-partitioning" ensures the correct distribution of stateful processing among available clients, but one requirement is that tables and streams must share shards.

To shard the table differently, you must first repartition the stream using group_by.

This video explains about this topic in-depth.

Prerequisite:
-------------------
Understanding that Kafka Topic Partitions Still Drive Parallelism in Faust
Create a table for a Kafka stream using Python Faust

Documentation Link:
--------------------------------

Code is available in the pinned comment.

Check this playlist for more Data Engineering related videos:

Apache Kafka form scratch

Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--

🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY YOUTUBE CHANNEL
Рекомендации по теме
Комментарии
Автор

Code:


Start Zookeeper:



Start Broker:





Tables:

--create --topic transactions_topic1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3

Producer Code:

from time import sleep
from json import dumps
from kafka import KafkaProducer
import random


def custom_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
return


producer = KafkaProducer(bootstrap_servers=['localhost:9092'], partitioner=custom_partitioner, value_serializer=lambda x: dumps(x).encode('utf-8'))


countr_list=["India", "Nepal", "USA", "Bhutan"]

for e in range(10):
data={"user_id":e, "country":random.choice(countr_list), "amount":1}
print("Inserting the datae : ", data)
producer.send(topic_name, key=str(e).encode(), value=(data))
sleep(0.2)



Consumer Code:

import faust

app=faust.App('demo-transactions_grouping12345', broker='localhost:9092', topic_partitions=3)

class withdrawals_data(faust.Record, serializer='json'):
user_id:int
country:str
amount:int

input_topic = app.topic('transactions_topic1', value_type=withdrawals_data)
= app.Table("country_wise_withdrawals1234", default=int)


@app.agent(input_topic)
async def processor(stream):
async for message in stream:
print(message)
1
Tabled'))

Start instances:

faust -A main worker -l info
faust -A main worker -l info --web-port=6067


Correct Case:

--create --topic transactions_topic2 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3

Producer:

from time import sleep
from json import dumps
from kafka import KafkaProducer
import random


def custom_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
return


producer = KafkaProducer(bootstrap_servers=['localhost:9092'], partitioner=custom_partitioner, value_serializer=lambda x: dumps(x).encode('utf-8'))


countr_list=["India", "Nepal", "USA", "Bhutan"]

for e in range(10):
data={"user_id":e, "country":random.choice(countr_list), "amount":1}
print("Inserting the datae : ", data)
producer.send(topic_name, key=str(e).encode(), value=(data))
sleep(0.2)

Consumer:


import faust

app=faust.App('demo-transactions_grouping12345678912', broker='localhost:9092', topic_partitions=3)

class withdrawals_data(faust.Record, serializer='json'):
user_id:int
country:str
amount:int

input_topic = app.topic('transactions_topic2', value_type=withdrawals_data)

= app.Table("country_wise_withdrawals12346789123", default=int)


@app.agent(input_topic)
async def processor(stream):
async for message in
print(message)
1
Tabled'))

Start instances:

faust -A main worker -l info
faust -A main worker -l info --web-port=6067

KnowledgeAmplifier
Автор

Thank you. You have great videos. I have been using this framework for a long time for data science projects.
I wish your channel more subscribers and rapid growth

serg