kafka 실행
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
스프링 서버 실행
$ mvn spring-boot:run
$ curl http://localhost:8080/start
Mono
: 한 개의 값을 전달하는 reactor 객체Flux
: 여러 개의 값을 전달하는 reactor 객체
map()
: Flux
데이터를 1-1 방식으로 변환한다. 인자로는 Flux
의 타입을 받는다.flatMap()
: Flux
데이터를 1-N 방식으로 변환하여 시퀀스를 생성한다. 인자로는 Flux
의 타입을 받는다.donOnNext()
: Flux
가 Subscriber에 next 신호를 발생할 때 호출된다. 인자로는 Flux
의 타입을 받는다.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.nhn.forward2019</groupId>
<artifactId>reactive-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>reactive-kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor.kafka/reactor-kafka -->
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.2.0.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
io.projectreactor.kafka:reactor-kafka
org.springframework.kafka:spring-kafka
KafkaManager
@Component
public class KafkaManager {
private static final String BOOTSTRAP_SERVERS = "localhost:9092"; // kafka host
private final Map<String, Object> consumerProps; // consumer settings
private final Map<String, Object> producerProps; // producer settings
public KafkaManager() {
this.consumerProps = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
ConsumerConfig.CLIENT_ID_CONFIG, "consumer",
ConsumerConfig.GROUP_ID_CONFIG, "group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
this.producerProps = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
ProducerConfig.CLIENT_ID_CONFIG, "producer",
ProducerConfig.ACKS_CONFIG, "all",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
}
public Flux<SenderResult<String>> producer(final Publisher<? extends SenderRecord<String, String, String>> publisher) {
final SenderOptions<String, String> options = SenderOptions.create(producerProps);
return KafkaSender.create(options)
.send(publisher);
}
public Flux<ReceiverRecord<String, String>> consumer(final String topic) {
final ReceiverOptions<String, String> options = ReceiverOptions.<String, String>create(consumerProps)
.subscription(List.of(topic));
return KafkaReceiver.create(options)
.receive();
}
}
ProducerConfig
BOOTSTRAP_SERVERS_CONFIG
: kafka serverCLIENT_ID_CONFIG
: client idACKS_CONFIG
: 프로듀서가 메세지를 보내고 그 메세지를 kafka가 잘 받았는지 확인할 것인지 아닌지KEY_SERIALIZER_CLASS_CONFIG
: broker에게 보낼 객체를 바이트 배열로 변환하기 위해 serializing을 한다.VALUE_SERIALIZER_CLASS_CONFIG
: broker에게 보낼 객체를 바이트 배열로 변환하기 위해 serializing을 한다.BOOTSTRAP_SERVERS_CONFIG
: kafka serverCLIENT_ID_CONFIG
: client idGROUP_ID_CONFIG
: group idKEY_DESERIALIZER_CLASS_CONFIG
: broker가 보낸 바이트 배열을 객체로 변환하기 위해 deserializing을 한다.VALUE_DESERIALIZER_CLASS_CONFIG
: broker가 보낸 바이트 배열을 객체로 변환하기 위해 deserializing을 한다.AUTO_OFFSET_RESET_CONFIG
: auto.offset.reset
producer()
SenderOptions
옵션을 생성한다.KafkaSender.create(SenderOptions<K,V> options)
consumer()
ReceiverOptions
옵션을 생성한다.KafkaReceiver.create(ReceiverOptions<K,V> options)
@RestController
@RequestMapping("/")
public class DemoController {
private DemoService service;
private AtomicBoolean running;
public DemoController(DemoService service) {
this.service = service;
this.running = new AtomicBoolean(false);
}
@GetMapping("/start")
public Mono<String> start() {
return running.compareAndSet(false, true)
? service.start()
: Mono.just("Already Running");
}
@GetMapping("/stop")
public Mono<String> stop() {
return running.compareAndSet(true, false)
? service.stop()
: Mono.just("Not Running Now");
}
}
@RestController
@Controller
에 Data를 반환하는 @ResponseBody
가 추가된 어노테이션@RequestMapping({NAME})
NAME
과 일치하는 path는 모두 해당 컨트롤러로 전달된다.@GetMapping({NAME})
NAME
과 일치하는 path는 모두 해당 메소드로 전달된다.Atomic
Type
@Service
public class DemoService {
private static final Logger logger = LoggerFactory.getLogger(DemoService.class);
private static final String SERVICENAME = "demo";
KafkaManager kafkaManager;
Disposable disposable;
public DemoService(KafkaManager kafkaManager) {
this.kafkaManager = kafkaManager;
}
public Mono<String> start() {
consume();
produce();
return Mono.just("START");
}
public Mono<String> stop() {
dispose(disposable);
return Mono.just("STOP");
}
protected void dispose(Disposable disposable) {
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
}
/***** produce *****/
protected void produce() {
final Flux<SenderRecord<String, String, String>> records = generateSource()
.doOnNext(comp -> logger.info("Create - name: {}\tmajor: {}",comp.getT1(), comp.getT2()))
.map(Object::toString)
.map(i -> SenderRecord.create(new ProducerRecord<>(SERVICENAME, i, i), i));
kafkaManager.producer(records)
.subscribe();
}
/***** consume *****/
protected void consume() {
disposable = kafkaManager.consumer(SERVICENAME)
.subscribe();
}
/***** general *****/
protected Flux<Tuple2<String, String>> generateSource(){
return Flux.just(
Tuples.of("eun", "computer"),
Tuples.of("lobster", "statistics"),
Tuples.of("zooho", "computer"),
Tuples.of("hyeon", "mechanical"),
Tuples.of("nayng", "electronic"))
.delayElements(Duration.ofMillis(1000));
}
}
@Service
start()
method
consume()
과 produce()
메소드르 실행한다. produce()
protected void produce() {
final Flux<SenderRecord<String, String, String>> records = generateSource()
.doOnNext(comp -> logger.info("Create - name: {}\tmajor: {}",comp.getT1(), comp.getT2()))
.map(Object::toString)
.map(i -> SenderRecord.create(new ProducerRecord<>(SERVICENAME, i, i), i));
kafkaManager.producer(records)
.subscribe();
}
Flux
가 값을 1초마다 publish한다.
doOnNext()
에서 Flux
객체를 받아 컨슘한 후 로그를 찍는다.SenderRecord.create(ProducerRecord<topic, K, V> record, T correlationMetadata)
correlationMetadata
는 kafka로 전송되진 않지만 SenderResult
를 이 레코드에 일치시키기 위한 응답에 포함된 부가적인 상관 관계 메타데이터이다.ProducerRecord<>(String topic, K key, V value)
kafkaManager.producer()
는 Flux<SenderRecord<String, String>>
를 반환한다.
Flux.subscribe()
는 해당 Flux
를 subscribe하고 무한대의 요구를 요청할 수 있다.consume()
protected void consume() {
disposable = kafkaManager.consumer(SERVICENAME)
.subscribe();
}
kafkaManager.producer()
는 Flux<ReceiverRecord<String, String>>
를 반환한다.
Flux.subscribe()
는 해당 Flux
를 subscribe하고 무한대의 요구를 요청할 수 있다.