appllication.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: adamsoft
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
설정 클래스를 추가합니다.
KafkaConfiguration
// 환경 설정 클래스라는 것을 알려주는 어노테이션
// 인스턴스를 생성해서 직접 관리하지 않고 프레임워크가 생명 주기를 관리 : 제어의 역전
@Configuration
public class KafkaConfiguration {
//설정 파일에서 값을 가지고 와서 주입하는 코드
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
//메시지를 게시하는 프로듀서의 설정
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory(configs);
}
//카프카 사용을 위한 인스턴스를 생성해주는 메서드
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
제어의 역전 (IoC)
일반적으로, 개발자는 클래스 인스턴스를 직접 생성하고 그 수명 주기를 관리합니다. 하지만 IoC를 사용하면, 클래스는 개발자가 작성하지만 인스턴스 생성과 수명 주기 관리는 프레임워크나 컨테이너가 맡습니다. 이를 통해 개발자는 디자인 패턴이나 객체의 수명 주기에 대해 고민하지 않고, 빠르게 개발을 진행할 수 있습니다.
의존성 주입 (Dependency Injection, DI)
클래스 내부에서 필요한 인스턴스를 직접 생성하는 대신, 외부에서 생성된 인스턴스를 생성자나 setter를 통해 주입받는 방식입니다. 이 기법을 사용하면 인스턴스 간의 결합도가 낮아져서, 하나의 변경이 다른 인스턴스에 영향을 미치는 것을 최소화할 수 있습니다.
메시지를 게시하는 클래스를 생성합니다.
KafkaProducer
//서비스 클래스 라는 것을 명시하고 인스턴스를 자동으로 생성해달라고 하는 어노테이션
@Service
//자동 주입되는 인스턴스를 대입받는 생성자를 만들어 달라는 어노테이션
@RequiredArgsConstructor
public class KafkaProducer {
//토픽 이름 설정
private static final String TOPIC = "exam-topic";
//의존성 주입을 받기 위한 어노테이션
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//로그 출력하기 위한 인스턴스를 생성
private final Logger log = LoggerFactory.getLogger(getClass());
//메시지 전송하는 메서드
public void sendMessage(String name, int age) {
log.info("Produce message : {}{}", name, age);
//System.out.println("전송된 메시지:" + name + age);
String message = "{\"name\":" + "\"" + name + "\"" + ", \"age\":" + age + "}";
//실제 메시지 전송
this.kafkaTemplate.send(TOPIC, message);
}
}
메시지 구독 클래스를 생성합니다
KafkaConsumer
@Service
public class KafkaConsumer {
private final Logger log = LoggerFactory.getLogger(getClass());
//exam-topic에 들어오는 메시지를 읽어내는 메서드: 비동기적으로 백그라운드에서 수행
//토픽이 들어오면 자동으로 호출
@KafkaListener(topics = "exam-topic", groupId = "adamsoft")
public void consume(String message) throws IOException {
log.info("Consumed message : {}", message);
JSONObject messageObj = new JSONObject(message);
log.info(messageObj.getString("name"));
log.info(messageObj.getInt("age") + "");
}
}
사용자의 요청을 처리하는 클래스 생성합니다.
KafkaController
//데이터를 리턴하는 컨트롤러: REST API를 만들기 위한 어노테이션
@RestController
//요청 경로
@RequestMapping(value = "/kafka")
@Slf4j
@RequiredArgsConstructor
public class KafkaController {
@Autowired
private KafkaProducer producer;
//POST 방식으로 요청이 오면 처리
@PostMapping
@ResponseBody
public String sendMessage(@RequestParam("name") String name, @RequestParam("age") int age) {
this.producer.sendMessage(name, age);
return "success";
}
}
build.gradle 파일의 dependencies 에서 JSON 파싱을 위한 라이브러리의 의존성을 설정합니다.
// JSON 파싱을 위한 라이브러리의 의존성을 설정
implementation 'org.json:json:20190722'
Postman에서 API를 이용해서 POST 방식으로 name 과 age 의 값을 설정해서 전송합니다.
인텔리제이 콘솔 창에서 Consumer가 메시지를 받아와서 메시지가 출력된 것을 확인할 수 있습니다. 실제 구현에서는 하나의 애플리케이션이 구독과 게시를 모두 수행할 수도 있고, 둘 중 하나만 수행할 수도 있습니다.
'BackEnd > JAVA SPRING' 카테고리의 다른 글
[Spring Boot] Docker + Github Actions CI/CD 구축 (7) | 2024.10.21 |
---|---|
[Kafka] Apache Kafka (3) | 2024.10.08 |