filmov
tv
Understanding that Kafka Topic Partitions Still Drive Parallelism in Faust

Показать описание
Let's explore whether Kafka Topic Partitions Still Drive Parallelism in Faust or not ..
Prerequisite:
--------------------------
Kafka Topic Partitions & Producers using Python
Parallel Processing in Kafka Consumer
Reference Documentation:
-----------------------------------------------
Code:
--------------
Start Zookeeper:
------------------------------
Start Broker:
------------------------------
Start Kafka Topic:
------------------------------
Parallel Processing:
----------------------
from time import sleep
from json import dumps
from kafka import KafkaProducer
def custom_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
print("The key is : {}".format(key))
print("All partitions : {}".format(all_partitions))
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],partitioner=custom_partitioner)
topic_name='hello_world5'
for e in range(1000):
data = {'number' : e}
print(data)
sleep(0.5)
Consumer:
--------------
import faust
app=faust.App('demo-streaming',broker='localhost:9092')
async def processor(stream):
async for message in stream:
print(f'Received {message}')
Start instances:
------------------
faust -A main worker -l info
faust -A main worker -l info --web-port=6067
Check this playlist for more Data Engineering related videos:
Apache Kafka form scratch
Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY YOUTUBE CHANNEL
Prerequisite:
--------------------------
Kafka Topic Partitions & Producers using Python
Parallel Processing in Kafka Consumer
Reference Documentation:
-----------------------------------------------
Code:
--------------
Start Zookeeper:
------------------------------
Start Broker:
------------------------------
Start Kafka Topic:
------------------------------
Parallel Processing:
----------------------
from time import sleep
from json import dumps
from kafka import KafkaProducer
def custom_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
print("The key is : {}".format(key))
print("All partitions : {}".format(all_partitions))
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],partitioner=custom_partitioner)
topic_name='hello_world5'
for e in range(1000):
data = {'number' : e}
print(data)
sleep(0.5)
Consumer:
--------------
import faust
app=faust.App('demo-streaming',broker='localhost:9092')
async def processor(stream):
async for message in stream:
print(f'Received {message}')
Start instances:
------------------
faust -A main worker -l info
faust -A main worker -l info --web-port=6067
Check this playlist for more Data Engineering related videos:
Apache Kafka form scratch
Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY YOUTUBE CHANNEL
Комментарии