Skip to content
This repository has been archived by the owner on Apr 8, 2020. It is now read-only.
Ioannis Polyzos edited this page Jun 4, 2014 · 4 revisions

Camel-Kafka is an Apache Camel component that allows you to work with Apache Kafka message oriented middleware.

Available as of Camel 2.13

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-kafka</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

Apache Kafka Overview

Apache Kafka, is a distributed messaging system that due to its architecture, performance and scalability characteristics, has proven to be revolutionary in today’s messaging technologies and has been used with success in several domains and projects - most notably among them the LinkedIn’s real-time activity data pipeline.

URI format

kafka:topicName[?options]

Below are examples with the mandatory options :

XML

<to uri="kafka:SampleTopic?zkConnect=localhost:2181&amp;metadataBrokerList=localhost:9092"/>

Java DSL

to("kafka:SampleTopic?zkConnect=localhost:2181&amp;metadataBrokerList=localhost:9092"/>

Options

Camel Kafka Defaults

Property Value
TOPIC kafka.TOPIC
KEY kafka.CONTENT_TYPE
OFFSET kafka.OFFSET
PARTITION kafka.EXCHANGE_NAME
PARTITION KEY kafka.PARTITION_KEY

Kafka Configuration attributes

<tr>
	<td><tt>topicName</tt></td>
	<td>kafka.DEFAULT_TOPIC</td>
	<td>Topic Name</td>
</tr>

<tr>
	<td><tt>transferExchange</tt></td>
	<td>false</td>
	<td>Transfer Exchange</td>
</tr>

<tr>
	<td><tt>concurrentConsumers</tt></td>
	<td>10</td>
	<td>Concurrent consumers (i.e streams in this context)</td>
</tr>

<tr>
	<td><tt>partitionKey</tt></td>
	<td>DEFAULT_PARTITION</td>
	<td>Partition Key</td>
</tr>

<tr>
	<td><tt>groupId</tt></td>
	<td>kafka.DEFAULT_GROUP</td>
	<td>Topic Name</td>
</tr>

<tr>
	<td><tt>socketTimeoutMs</tt></td>
	<td>30000</td>
	<td>Socket Timeout (ms)</td>
</tr>

<tr>
	<td><tt>socketReceiveBufferBytes</tt></td>
	<td>65536</td>
	<td>Socket Receive Buffer Bytes</td>
</tr>

<tr>
	<td><tt>fetchMessageMaxBytes</tt></td>
	<td>1048576</td>
	<td>Fetch Message Max Bytes</td>
</tr>

<tr>
	<td><tt>autoCommitEnable</tt></td>
	<td>true</td>
	<td>Auto Commit Enable</td>
</tr>

<tr>
	<td><tt>autoCommitIntervalMs</tt> 
	</td><td>60000</td>
	<td>Auto Commit Interval (ms)</td>
</tr>

<tr>
	<td><tt>queuedMaxMessageChunks</tt> 
	</td><td>10</td>
	<td>Queued Max Message Chunks</td>
</tr>

<tr>
	<td><tt>rebalanceMaxRetries</tt></td>
	<td>4</td>
	<td>Rebalance Max Retries</td>
</tr>

<tr>
	<td><tt>fetchMinBytes</tt></td>
	<td>1</td>
	<td>Fetch Min Bytes</td>
</tr>

<tr>
	<td><tt>fetchWaitMaxMs</tt></td>
	<td>100</td>
	<td>Fetch Wait Max (ms)</td>
</tr>

<tr>
	<td><tt>rebalanceBackoffMs</tt></td>
	<td>2000</td>
	<td>Rebalance Backoff (ms)</td>
</tr>

<tr>
	<td><tt>refreshLeaderBackoffMs</tt></td>
	<td>200</td>
	<td>Refresh Leader Backoff (ms)</td>
</tr>

<tr>
	<td><tt>autoOffsetReset</tt> 
	</td><td>largest</td>
	<td>Auto Offset Reset</td>
</tr>

<tr>
	<td><tt>consumerTimeoutMs</tt> 
	</td><td>-1</td>
	<td>Consumer Timeout (ms)</td>
</tr>

<tr>
	<td><tt>zookeeperSessionTimeoutMs</tt> 
	</td><td>6000</td>
	<td>Zookeeper Session Timeout (ms)</td>
</tr>

<tr>
	<td><tt>zookeeperConnectionTimeoutMs</tt> 
	</td><td>60000</td>
	<td>Zookeeper Connection Timeout (ms)</td>
</tr>

<tr>
	<td><tt>zookeeperSyncTimeMs</tt> 
	</td><td>2000</td>
	<td>Zookeeper Sync Time (ms)</td>
</tr>

<tr>
	<td><tt>requestRequiredAcks</tt></td>
	<td>0</td>
	<td>Request Required Acks</td>
</tr>

<tr>
	<td><tt>requestTimeoutMs</tt> </td>
	<td>10000</td>
	<td>Request Timeout (ms)</td>
</tr>

<tr>
	<td><tt>producerType</tt></td>
	<td>sync</td>
	<td>Producer Type</td>
</tr>

<tr>
	<td><tt>serializerClass</tt> 
	</td><td>kafka.serializer.DefaultEncoder</td>
	<td>Serializer Class</td>
</tr>

<tr>
	<td><tt>partitionerClass</tt> 
	</td><td>kafka.producer.DefaultPartitioner</td>
	<td>Partitioner Class</td>
</tr>

<tr>
	<td><tt>compressionCodec</tt> 
	</td><td>none</td>
	<td>Compression Codec</td>
</tr>

<tr>
	<td><tt>compressedTopics</tt> 
	</td><td>null</td>
	<td>Compressed Topics</td>
</tr>

<tr>
	<td><tt>messageSendMaxRetries</tt> 
	</td><td>3</td>
	<td>Message Send Max Retries</td>
</tr>

<tr>
	<td><tt>retryBackoffMs</tt></td>
	<td>100</td>
	<td>Retry Backoff (ms)</td>
</tr>

<tr>
	<td><tt>topicMetadataRefreshIntervalMs</tt></td>
	<td>600000</td>
	<td>Topic Metadata Refresh Interval (ms)</td>
</tr>

<tr>
	<td><tt>queueBufferingMaxMs</tt></td>
	<td>5000</td>
	<td>Queue Buffering Max Time (ms)</td>
</tr>

<tr>
	<td><tt>queueBufferingMaxMessages</tt> 
	</td><td>10000</td>
	<td>Queue Buffering Max Messages</td>
 </tr>

<tr>
	<td><tt>queueEnqueueTimeoutMs</tt> 
	</td><td>-1</td>
	<td>Queue Enqueue Timeout (ms)</td>
</tr>

<tr>
	<td><tt>batchNumMessages</tt> 
	</td><td>200</td>
	<td>Batch Number of Messages</td>
<tr>
	<td><tt>sendBufferBytes</tt> 
	</td><td>102400</td>
	<td>Send Buffer Bytes</td>
</tr>

<tr>
	<td><tt>clientId</tt> 
	</td><td>kafka.DEFAULT_CLIENT_ID</td>
	<td>Client Id</td>
</tr>

<tr>
	<td><tt>keySerializerClass</tt> 
	</td><td>kafka.serializer.StringEncoder</td>
	<td>Key Serializer Class</td>
</tr>
Property Default Description

NOTE: more information and detail description of the configuration can be found at Kafka Documentation pages.

Usage examples

The usage examples are taken from the integration tests which can be found along the source.

Simple Usage

from("direct:kaiotep").to("kafka:kaiot?zkConnect=localhost:2181&metadataBrokerList=localhost:9092&producerType=async&groupId="+ uid + KafkaConstants.DEFAULT_GROUP.value);
            
from("kafka:kaiot?zkConnect=localhost:2181&groupId="+ uid +KafkaConstants.DEFAULT_GROUP.value).to("mock:result");

Transfer Exchange

from("direct:ateioep").to("kafka:ateio?zkConnect=localhost:2181&metadataBrokerList=localhost:9092&transferExchange=true&producerType=async&groupId="+ uid + KafkaConstants.DEFAULT_GROUP.value);
            
from("kafka:ateio?zkConnect=localhost:2181&transferExchange=true&groupId="+ uid +KafkaConstants.DEFAULT_GROUP.value).to("mock:result");

Custom Partitioner

from("direct:spuiaioutep").to("kafka:spuiaiout?zkConnect=localhost:2181&partitionerClass=org.apache.camel.component.kafka.partitioner.SimplePartitioner&producerType=async&metadataBrokerList=localhost:9092&groupId="+ uid + KafkaConstants.DEFAULT_GROUP.value);

from("kafka:spuiaiout?zkConnect=localhost:2181&groupId="+ uid + KafkaConstants.DEFAULT_GROUP.value).to("mock:result");

For more please refer to the full usage examples which exist in the form of integration tests at camel-kafka repository.

More resources

For more please information and in depth configuration refer to the Apache Kafka Documentation.