Kafka Producer Tutorial - Complete Guide with Code Example

preview_player
Показать описание
Welcome to this comprehensive Kafka Producer tutorial! In this video, we’ll dive deep into the fundamentals of Kafka producers and cover everything you need to know to get started building your own producer applications.

Here's what we'll cover:

Kafka Producer Basics: Learn what a Kafka producer is and how it fits into the Kafka ecosystem.
Producer Workflow: Understand the steps for sending messages from your application to Kafka.
Compression Techniques: See how to configure compression with options like gzip or snappy to make your data transfers efficient.
Code Walkthrough: Watch as we write and run a Python Kafka producer application, with detailed explanations of each step.

🔗 Resources

Don’t forget to like, subscribe, and hit the bell icon for more tutorials on Kafka, Apache Spark, and real-time data streaming!

Song: Dawn

Tags:
spark structured streaming,spark structured streaming with kafka using pyspark,spark structured streaming databricks,spark structured streaming tutorial,spark structured streaming pyspark,spark structured streaming interview questions,spark structured streaming checkpoint,spark structured streaming example,apache spark structured api,watermarking in spark structured streaming,apache kafka,apache kafka tutorial for begineers,apache kafka architecture,apache kafka fundamental, spark structured streaming playlist,apache spark playlist, real time streaming,real time application , real time data engineering, data engineering, near real time streaming
Рекомендации по теме
Комментарии
Автор

Thank you for detail video.. can you please share the Python Kafka producer and Consumer application code?

ranjitpattnaik
Автор

Kafka producer

from confluent_kafka import Producer
import json
import time
import os

config = {
'bootstrap.servers': 'pkc-9q8rv.ap-south-2.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': os.getenv("KAFKA_API_KEY"),
'sasl.password': os.getenv("KAFKA_API_SECRET"),
'client.id': 'transaction-producer',
'acks': 'all',
'retries': 5,
'batch.size': 16384,
'linger.ms': 5,
'compression.type': 'gzip'
}

# Initialize the Kafka producer
producer = Producer(config)

# Define the topic to send data to
topic = 'my_first_topic'


# Callback to handle delivery reports (called once for each message)
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [Partition: {msg.partition()}] at Offset: {msg.offset()}")

# Function to produce messages to Kafka
def produce_messages():
for i in range(10):
key = f"key-{i}"
value = json.dumps({"id": i, "message": f"sample message {i}"})

print(f"Producing message : Key = {key} Value = {value}")
# Send message with key-value and delivery report callback
producer.produce(
topic=topic,
key=key,
value=value,
callback=delivery_report

)

# Poll to trigger the delivery report callback
producer.poll(0)

# Optional: Add delay for demonstration purposes
time.sleep(1)

# Flush the producer to ensure all messages are sent
producer.flush()


# Run the producer function
if __name__ == "__main__":
produce_messages()

anirvandecodes