加入收藏 | 设为首页 | 会员中心 | 我要投稿 厦门站长网 (https://www.0592zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 云计算 > 正文

Kafka Java客户端代码示范

发布时间:2022-05-16 12:30:28 所属栏目:云计算 来源:互联网
导读:kafka是一种高吞吐量的分布式发布订阅消息系统 kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统
         kafka是一种高吞吐量的分布式发布订阅消息系统
 
         kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)
 
         当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。
 
         高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理
 
测试环境
 
kafka_2.10-0.8.1.1 3个节点做的集群
 
zookeeper-3.4.5 一个实例节点
 
代码示例
 
消息生产者代码示例
 
复制
import java.util.Collections;  
import java.util.Date;  
import java.util.Properties;  
import java.util.Random;  
   
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
   
/**  
 * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example  
 * @author Fung  
 *  
 */
public class ProducerDemo {  
    public static void main(String[] args) {  
        Random rnd = new Random();  
        int events=100;  
   
        // 设置配置属性  
        Properties props = new Properties();  
        props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        // key.serializer.class默认为serializer.class  
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");  
        // 可选配置,如果不配置,则使用默认的partitioner  
        props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");  
        // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失  
        // 值为0,1,-1,可以参考  
        // http://kafka.apache.org/08/configuration.html  
        props.put("request.required.acks", "1");  
        ProducerConfig config = new ProducerConfig(props);  
   
        // 创建producer  
        Producer<String, String> producer = new Producer<String, String>(config);  
        // 产生并发送消息  
        long start=System.currentTimeMillis();  
        for (long i = 0; i < events; i++) {  
            long runtime = new Date().getTime();  
            String ip = "192.168.2." + i;//rnd.nextInt(255);  
            String msg = runtime + ",www.example.com," + ip;  
            //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0  
            KeyedMessage<String, String> data = new KeyedMessage<String, String>(  
                    "page_visits", ip, msg);  
            producer.send(data);  
        }  
        System.out.println("耗时:" + (System.currentTimeMillis() - start));  
        // 关闭producer  
        producer.close();  
 
消息消费者代码示例
 
复制
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
   
import kafka.consumer.Consumer;  
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  
   
/**  
 * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example  
 *   
 * @author Fung  
 *  
 */
public class ConsumerDemo {  
    private final ConsumerConnector consumer;  
    private final String topic;  
    private ExecutorService executor;  
   
    public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {  
        consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));  
        this.topic = a_topic;  
    }  
   
    public void shutdown() {  
        if (consumer != null)  
            consumer.shutdown();  
        if (executor != null)  
            executor.shutdown();  
    }  
   
    public void run(int numThreads) {  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, new Integer(numThreads));  
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer  
                .createMessageStreams(topicCountMap);  
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
   
        // now launch all the threads  
        executor = Executors.newFixedThreadPool(numThreads);  
   
        // now create an object to consume the messages  
        //  
        int threadNumber = 0;  
        for (final KafkaStream stream : streams) {  
            executor.submit(new ConsumerMsgTask(stream, threadNumber));  
            threadNumber++;  
        }  
    }  
   
    private static ConsumerConfig createConsumerConfig(String a_zookeeper,  
            String a_groupId) {  
        Properties props = new Properties();  
        props.put("zookeeper.connect", a_zookeeper);  
        props.put("group.id", a_groupId);  
        props.put("zookeeper.session.timeout.ms", "400");  
        props.put("zookeeper.sync.time.ms", "200");  
        props.put("auto.commit.interval.ms", "1000");  
   
        return new ConsumerConfig(props);  
    }  
   
    public static void main(String[] arg) {  
        String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };  
        String zooKeeper = args[0];  
        String groupId = args[1];  
        String topic = args[2];  
        int threads = Integer.parseInt(args[3]);  
   
        ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);  
        demo.run(threads);  
   
        try {  
            Thread.sleep(10000);  
        } catch (InterruptedException ie) {  
   
        }  
        demo.shutdown();  
 
消息处理类
 
复制
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
   
public class ConsumerMsgTask implements Runnable {  
    private KafkaStream m_stream;  
    private int m_threadNumber;  
   
    public ConsumerMsgTask(KafkaStream stream, int threadNumber) {  
        m_threadNumber = threadNumber;  
        m_stream = stream;  
    }  
   
    public void run() {  
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();  
        while (it.hasNext())  
            System.out.println("Thread " + m_threadNumber + ": "
                    + new String(it.next().message()));  
        System.out.println("Shutting down Thread: " + m_threadNumber);  
 
Partitioner类示例
 
复制
import kafka.producer.Partitioner;  
import kafka.utils.VerifiableProperties;  
   
public class PartitionerDemo implements Partitioner {  
    public PartitionerDemo(VerifiableProperties props) {  
   
    }  
   
    @Override
    public int partition(Object obj, int numPartitions) {  
        int partition = 0;  
        if (obj instanceof String) {  
            String key=(String)obj;  
            int offset = key.lastIndexOf('.');  
            if (offset > 0) {  
                partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;  
            }  
        }else{  
            partition = obj.toString().length() % numPartitions;  
        }  
           
        return partition;  

(编辑:厦门站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!