Create a table for a Kafka stream using Python Faust

preview_player
Показать описание
A Kafka table is a distributed in-memory dictionary, backed by a Kafka changelog topic used for persistence and fault-tolerance.
This video explains how to work with Faust Table in-depth .

Code is available in the pinned comment .

Check this playlist for more Data Engineering related videos:

Apache Kafka form scratch

Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--

🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY YOUTUBE CHANNEL
Рекомендации по теме
Комментарии
Автор

Code:

Start Zookeeper:



Start Broker:



Start Kafka Topic:

--create --topic hello_world --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1


Produce Messages:

--topic hello_world --bootstrap-server localhost:9092

Age Based Filter Example:

import faust

app=faust.App('demo-streaming', broker='localhost:9092')

class Greeting(faust.Record, serializer='json'):
name:str
age:int

input_topic = app.topic('hello_world', value_type=Greeting)
aged_table = app.Table("major-count", key_type=str, value_type=str, partitions=1, default=int)


@app.agent(input_topic)
async def processor(stream):
async for message in stream:
if(message.age>30):
aged_table[str(message.name)] = message.age
Tabled'))

Word Count Example:

import faust

app=faust.App('demo-streaming1', broker='localhost:9092')



input_topic = app.topic('hello_world2', value_type=str, value_serializer='raw')
aged_table = app.Table("word-count", key_type=str, value_type=str, partitions=1, default=int)


@app.agent(input_topic)
async def processor(stream):
async for message in stream:
data_part=message.split()
for ms in data_part:
aged_table[str(ms)] += 1
Tabled'))

KnowledgeAmplifier
Автор

can we have joins between two kafka topics using some key (topics are created using MySQL connectors ) using FAUST?

Vikeshkumar-bpkj
welcome to shbcf.ru