springcloud-stream(3.X) 的基本使用

发布时间:2022-02-15 06:29:01
修改时间:2022-07-20 14:52:50
总阅读数:957
今日阅读数:0
昨日日阅读数:1
字数:3356

springcloud-stream(3.X) 的基本使用

使用rabbit

增加依赖

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.8</version>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

消费者

消费者配置(application.yaml)

spring:
  cloud:
    stream:
      defaultBinder: rabbit
      bindings:
        msgAction-in-0:
          destination: msgEvent
          group: msgQueue

    function:
      definition: msgAction

  rabbitmq:
    addresses: localhost:5672
    username: test
    password: test

消费者代码(EventAction.java)

package com.qc.msg.consumer.stream;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Consumer;

/**
 * @author :zzq
 * @version :1.0
 * @date : 2022/2/7 22:02
 */
@Slf4j
@Configuration
public class EventAction {

    @Bean
    public Consumer<String> msgAction() {
        return string -> log.info("收到消息:{}",string);
    }

}

生产者

生产者配置(application.yaml)

spring:
  cloud:
    stream:
      defaultBinder: rabbit
      bindings:
        msgAction-out-0:
          destination: msgEvent
          group: msgQueue

  rabbitmq:
    addresses: localhost:5672
    username: test
    password: test

 

生产者代码(MsgAction.java)

package com.qc.msg.producer.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;

/**
 * @author :zzq
 * @version :1.0
 * @date :2022/2/7 22:12
 */
@Component
public class MsgAction {

    @Autowired
    private StreamBridge streamBridge;

    public void send(String msg){
        boolean sendRes = streamBridge.send("msgAction-out-0", msg);
    }

}

使用kafka

新增依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

修改配置

消费者配置(application.yaml)

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        msgAction-in-0:
          destination: msgEvent
          group: msgQueue

      kafka:
        binder:
          brokers: localhost
          replication-factor: 1


    function:
      definition: msgAction

生产者配置(application.yaml)

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        msgAction-out-0:
          destination: msgEvent
          group: msgQueue

      kafka:
        binder:
          brokers: localhost
          replication-factor: 1