利用生产者消费者设计模式来完成的拉取Kafka数据的简单示例。
package com.bay.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.log4j.BasicConfigurator;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Properties;
/**
* Author by BayMin, Date on 2019/7/25.
*/
public class KafkaConsumerDemo {
public static void main(String[] args) {
BasicConfigurator.configure();
ShareResource shareResource = new ShareResource();
new Thread(new Producer(shareResource), "Thread_Producer").start();
new Thread(new Consumer(shareResource), "Thread_Consumer").start();
}
}
class Producer implements Runnable {
private ShareResource shareResource;
Producer(ShareResource shareResource) {
this.shareResource = shareResource;
}
@SuppressWarnings("InfiniteLoopStatement")
@Override
public void run() {
String topic = "test";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop011:9092;hadoop012:9092;hadoop013:9092");
props.put("group.id", "group01");
props.put("zookeeper.connect", "hadoop010:2181,hadoop011:2181,hadoop012:2181,hadoop013:2181");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
int flag = 0;
while (true) {
ConsumerRecords<String, String> polls = consumer.poll(100);
if (polls.count() == 0)
flag += 1;
else
flag = 0;
for (ConsumerRecord<String, String> poll : polls) {
shareResource.push(poll.value().concat("\n"));
}
if (flag > 10)
shareResource.forcePush();
}
}
}
class Consumer implements Runnable {
private ShareResource shareResource;
Consumer(ShareResource shareResource) {
this.shareResource = shareResource;
}
@SuppressWarnings("InfiniteLoopStatement")
@Override
public void run() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
FileOutputStream fos = null;
long startTime = System.currentTimeMillis();
String fileName = "C:\\Users\\lenovo\\Desktop\\12\\Kafka_data_" + format.format(startTime) + ".txt";
try {
fos = new FileOutputStream(fileName);
while (true) {
long currentTime = System.currentTimeMillis();
if ((currentTime - startTime) >= ShareResource.getGAP()) {
fileName = "C:\\Users\\lenovo\\Desktop\\12\\Kafka_data_" + format.format(currentTime) + ".txt";
fos = new FileOutputStream(fileName);
startTime = currentTime;
}
shareResource.pop(fos);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} finally {
try {
assert fos != null;
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class ShareResource {
private StringBuffer data = new StringBuffer();
private final static int BUFFER_SIZE = 1024 * 1024;
private final static int GAP = 5 * 60 * 1000;
private static boolean isEmpty = true;
private static boolean isForce = false;
void push(String message) {
synchronized ("") {
try {
if (this.data.toString().getBytes().length < BUFFER_SIZE)
isEmpty = true;
if (!isEmpty)
"".wait();
this.data.append(message);
if (this.data.toString().getBytes().length >= BUFFER_SIZE) {
isEmpty = false;
"".notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void forcePush() {
synchronized ("") {
isForce = true;
"".notify();
}
}
void pop(FileOutputStream fos) {
synchronized ("") {
try {
if (this.data.toString().getBytes().length >= BUFFER_SIZE)
isEmpty = false;
if (isEmpty)
"".wait();
if (this.data.length() != 0) {
fos.write(this.data.toString().getBytes());
this.data.delete(0, this.data.length());
isEmpty = true;
if (!isForce)
"".notify();
isForce = false;
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
static int getGAP() {
return GAP;
}
}
模拟生成Kafka数据:
package com.bay.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.BasicConfigurator;
import java.util.Date;
import java.util.Properties;
/**
* Author by BayMin, Date on 2019/7/25.
*/
public class KafkaProducerDemo {
public static void main(String[] args) {
BasicConfigurator.configure();
String topic = "test";
Properties properties = new Properties();
properties.put("bootstrap.servers", "hadoop011:9092;hadoop012:9092;hadoop013:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> stringStringKafkaProducer;
stringStringKafkaProducer = new KafkaProducer<>(properties);
// 模拟生成数据
for (int i = 1; i < 10000; i++) {
String msg = i + " " + new Date().toString();
stringStringKafkaProducer.send(new ProducerRecord<>(topic, msg));
}
stringStringKafkaProducer.close();
}
}
Comments | NOTHING