이 글은 이병찬 님의 강의를 요약한 내용입니다. 원본 강의는 링크에서 확인하실 수 있습니다.
아주 인기있는 스트리밍 플랫폼
Spring에서 Kafka를 사용하려면 Spring Kafka를 사용하면 된다.
@KafkaListner
어노테이션을 붙이는 것만으로도 웹 MVC에서 리퀘스트 매핍을 한 것과 같은 역할을 할 수 있다.
컨트롤러와 이 listner가 하는 일이 동일하다고 보면 된다.
@KafkaListener(topics="myTopic")
public void listen(ConsumerRecord<?, ?> record) throws Exception {
logger.info(record.toString());
// Do Something...
}
ConsumerRecord
형태로 만들어서 전달이 된다.하지만 한 가지 약점이 존재
구조
producer ---publish---> stream <---subscribe--- consumer
consumer
특정 토픽에 대해서 kafka에 구독을 하고, 이것을 flux로 받아서 한 건 한 건씩 비동기로 처리를 하겠어!
producer
그저 flux create를 하는 것 뿐이야! 데이터만 밀어 넣어주면 돼!
public void process() {
consume().flatMap(this::recordToEventObject)
.flatMap(this::saveEvent)
.flatMap(this::getReceivers)
.flatMap(this::dataProcessing)
.flatMap(this::saveResult)
.subscribe();
}
sampleFirst()
와 groupBy()
를 이용한다.public void process() {
consume().flatMap(this::recordToEventObject)
.groupBy(Message::key)
.flatMap(flux -> flux.sampleFirst(Duration.ofSeconds(30)))
.flatMap(this::saveEvent)
.flatMap(this::getReceivers)
.flatMap(this::notify)
.flatMap(this::saveResult)
.subscribe();
}
기준 시간 동안 발생한 여러 이벤트는 하나의 메시지로 모아서 통지하자.
buffer()
를 이용한다.public void process() {
consume().flatMap(this::recordToEventObject)
.groupBy(Message::key)
.flatMap(flux -> flux.buffer(Duration.ofSeconds(30)))
.flatMap(this::notify)
.flatMap(this::saveResult)
.subscribe();
}
Flow의 flow
subscribe()
onSubscribe()
request(n)
public class CustomSubscriber extends BaseSubscriber<~> {
@Override
protected void bookOnSubscribe(Subscription subscription) {
request(10);
}
@Overrice
protected void hookOnNext(ReceiverRecord<String, String> record) {
Mono.just(record)
.flatMap(/* Do something */)
.subscribe(r -> {
offsetSink.next(record);
request(10);
});
}
}
public class CustomSubscriber extends BaseSubscriber<~> {
private FluxSink<~> offsetSink;
@Override
protected void hookOnNext(ReceiverREcord<String, String> record) {
...
Flux.<~>create(sink -> offsetSink = sink)
.reduce(-1L, (last, r) -> last < r.offset()
? commit(r)
: last)
.subscribe();
}
}