springcloud-stream(3.X) 的基本使用
使用rabbit
增加依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
消费者
消费者配置(application.yaml)
spring:
cloud:
stream:
defaultBinder: rabbit
bindings:
msgAction-in-0:
destination: msgEvent
group: msgQueue
function:
definition: msgAction
rabbitmq:
addresses: localhost:5672
username: test
password: test
消费者代码(EventAction.java)
package com.qc.msg.consumer.stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
/**
* @author :zzq
* @version :1.0
* @date : 2022/2/7 22:02
*/
@Slf4j
@Configuration
public class EventAction {
@Bean
public Consumer<String> msgAction() {
return string -> log.info("收到消息:{}",string);
}
}
生产者
生产者配置(application.yaml)
spring:
cloud:
stream:
defaultBinder: rabbit
bindings:
msgAction-out-0:
destination: msgEvent
group: msgQueue
rabbitmq:
addresses: localhost:5672
username: test
password: test
生产者代码(MsgAction.java)
package com.qc.msg.producer.stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;
/**
* @author :zzq
* @version :1.0
* @date :2022/2/7 22:12
*/
@Component
public class MsgAction {
@Autowired
private StreamBridge streamBridge;
public void send(String msg){
boolean sendRes = streamBridge.send("msgAction-out-0", msg);
}
}
使用kafka
新增依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
修改配置
消费者配置(application.yaml)
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
msgAction-in-0:
destination: msgEvent
group: msgQueue
kafka:
binder:
brokers: localhost
replication-factor: 1
function:
definition: msgAction
生产者配置(application.yaml)
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
msgAction-out-0:
destination: msgEvent
group: msgQueue
kafka:
binder:
brokers: localhost
replication-factor: 1