Handling Large Messages with Apache Kafka with in-depth Architecture & Programming

preview_player
Показать описание
Unfortunately, Kafka imposes a limit on the size of the payload that can be sent to the broker.
But there exists various usecases where message payloads are Large like in Image recognition, video analytics, audio analytics, and file processing etc.
This video covers the use cases, architectures, and libraries for handling large messages with Kafka.

Documentation Link:
----------------------------

Prerequisite:
-------------------------------
Best Practices for Storing Large Items and Attributes in DynamoDB

Pip Install Commands:
----------------------------------
pip install faust-large-message-serializer -t .
pip install kafka-python -t .
pip install faust-streaming -t .

Producer Code:
-----------------------
import faust
from faust import Record
import logging
from faust_large_message_serializer import LargeMessageSerializer, LargeMessageSerializerConfig
import random
import time

data_points=["Hello World","""Thank you so much for the explanation, I am currently building a project,
I have never heard of object storage until this video, I always thought its just cloud database + cloud storage,
this is such a neat idea !""","Nice presentation. It is very clearly summarized. Thanks a lot",
"""It seems like the object storage is used mainly for not-so-very-often used static unstructured data. Thus I'm a bit baffled by the recommendation to use it in a video-streaming service. Wouldn't SAN be a better solution if I wanted to design a YouTube-like site?
And, can we use buckets for a CDN-like solution?
Thanks!""","Bye","Check where message going","Hi There"]

class UserModel(Record, serializer="s3_json"):
comment: str

config = LargeMessageSerializerConfig(base_path="s3://irisseta/",
max_size=100,
large_message_s3_region="us-east-1",
large_message_s3_access_key="",
large_message_s3_secret_key="")

topic_name = "users_s3"

s3_backed_serializer = LargeMessageSerializer(topic_name, config, is_key=False)

# Here we use json as the first serializer and
# then we can upload everything to the S3 bucket to handle messages exceeding the configured maximum message size.
s3_json_serializer = json_serializer | s3_backed_serializer

# config
app = faust.App("app_id", broker="kafka://localhost:9092")

async def send_users():
print(data_user)
user = UserModel(**data_user)

Consumer Code:
------------------------------------
import faust
from faust import Record
import logging
from faust_large_message_serializer import LargeMessageSerializer, LargeMessageSerializerConfig
class UserModel(Record, serializer="s3_json"):
comment: str

config = LargeMessageSerializerConfig(base_path="s3://irisseta/",
max_size=100,
large_message_s3_region="us-east-1",
large_message_s3_access_key="",
large_message_s3_secret_key="")

topic_name = "{}"
s3_backed_serializer = LargeMessageSerializer(topic_name, config, is_key=False)

s3_json_serializer = json_serializer | s3_backed_serializer

app = faust.App("app_id", broker="kafka://localhost:9092")

async def users(users):
async for user in users:
print("The received event is : ",user)

Learn Apache Kafka form scratch

Check this playlist for more Data Engineering related videos:

Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--

🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY YOUTUBE CHANNEL
Рекомендации по теме
Комментарии
Автор

Can It be done without using s3 bucket ?

NandkishoreNangre
Автор

Very Good explanation ! How to handle large number of messages ( like bulk commit ) using kafka connector (source) and its relation with java heap size ?

yoyoyoo
Автор

Pls make a video on How to enable kafka monitoring. That'd be really helpful. Thanks.

akashshelke
Автор

Is this valid for json, avro msg also?

CodeUpskill
Автор

sir what is the system requirements for bigdata ?

avinash
join shbcf.ru