這篇文章主要介紹在Spring Boot應用程序中如何使用Apache Kafka,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
第1步:生成我們的項目: Spring Initializr 來生成我們的項目。我們的項目將提供Spring MVC / Web支持和Apache Kafka支持。
第2步:發布/讀取Kafka主題中的消息:
public class User { private String name; private int age; public User(String name, int age) { this.name = name; this.age = age; } }
第3步:通過application.yml
配置文件配置Kafka:
我們需要創建配置文件。我們需要以某種方式配置我們的Kafka生產者和消費者,以便能夠發布和讀取與主題相關的消息。相比建立一個使用@Configuration
標注的Java類,我們可以直接使用配置文件application.properties或application.yml。Spring Boot讓我們避免像過去一樣編寫的所有樣板代碼,同時為我們提供了更加智能的配置應用程序的方法,如下所示:
server: port: 9000 spring: kafka: consumer: bootstrap: localhost:9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
第4步:創建一個生產者,創建生產者會將我們的消息寫入該主題。
public class Producer { private static final Logger logger = LoggerFactory.getLogger(Producer.class); private static final String TOPIC = "users"; @Autowired private KafkaTemplatekafkaTemplate; public void sendMessage(String message) { logger.info(String.format( "#### -> Producing message -> %s", message)); this.kafkaTemplate.send(TOPIC, message); } }
自動連接autowire
到 KafkaTemplate
,使用它將消息發布到主題 - 這就是消息的生產者!
第5步:創建一個消費者,消費者是負責根據您自己的業務邏輯的需求閱讀處理消息的消息的服務。要進行設置,請輸入以下內容:
@Service public class Consumer { private final Logger logger = LoggerFactory.getLogger(Producer.class); @KafkaListener(topics = "users", groupId = "group_id") public void consume(String message) throws IOException { logger.info(String.format("#### -> Consumed message -> %s", message)); } }
在這里,我們告訴我們的方法void consume(String message)
訂閱用戶的主題,并將每條消息發送到應用程序日志。在您的實際應用程序中,您可以按照業務需要的方式處理消息。
第6步:創建REST控制器,們已經擁有了能夠消費Kafka消息所需的全部內容。
為了充分展示我們創建的所有內容的工作原理,我們需要創建一個具有單一端點的控制器。消息將發布到此端點,然后由我們的生產者處理。然后,我們的消費者將通過登錄到控制臺來捕獲并處理它。
@RestController @RequestMapping(value = "/kafka") public class KafkaController { private final Producer producer; @Autowired KafkaController(Producer producer) { this.producer = producer; } @PostMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producer.sendMessage(message); } }
讓我們使用cURL將消息發送給Kafka:
curl -X POST -F 'message=test' http://localhost:9000/kafka/publish
以上是“在Spring Boot應用程序中如何使用Apache Kafka”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注創新互聯行業資訊頻道!