Solo  当前访客:3 登录 注册

喧哗博客-http://blog.xuahua.com

繁华过后的沉寂--技术经验分享
浏览次数: 95,037    文章总数: 91    评论总数: 3
标签:

【转载】spring cloud stream

转载自:https://mp.weixin.qq.com/s/QD8kmtv-feI95QI-vVcf5A

Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。

 

Spring Cloud Stream (后面以 SCS 代替 Spring Cloud Stream) 本身内容很多,而且它还有很多外部的依赖,想要熟悉 SCS,必须要先了解 Spring Messaging 和 Spring Integration 这两个项目,接下来,文章将从围绕以下三点进行展开:

 

  • 什么是 Spring Messaging;

  • 什么是 Spring Integration;

  • 什么是 SCS 体系及其原理;

    Spring Messaging


    Spring Messaging 是 Spring Framework 中的一个模块,其作用就是统一消息的编程模型。

    • 比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header:

     

    package org.springframework.messaging;
    public interface Message<T> {
        T getPayload();
        MessageHeaders getHeaders();
    }

    • 消息通道 MessageChannel 用于接收消息,调用 send 方法可以将消息发送至该消息通道中 :

    @FunctionalInterface
    public interface MessageChannel {
        long INDEFINITE_TIMEOUT = -1;
        default boolean send(Message<?> message) {

             return send(message, INDEFINITE_TIMEOUT);

         }
         boolean send(Message<?> message, long timeout);
    }

    消息通道里的消息如何被消费呢? 
    • 由消息通道的子接口可订阅的消息通道 SubscribableChannel 实现,被 MessageHandler 消息处理器所订阅:

    public interface SubscribableChannel extends MessageChannel {
        boolean subscribe(MessageHandler handler);
        boolean unsubscribe(MessageHandler handler);
    }

    • MessageHandler 真正地消费/处理消息:

    @FunctionalInterface
    public interface MessageHandler {
        void handleMessage(Message<?> message) throws MessagingException;
    }

    Spring Messaging 内部在消息模型的基础上衍生出了其它的一些功能,如:

    1. 消息接收参数及返回值处理:消息接收参数处理器 HandlerMethodArgumentResolver 配合 @Header, @Payload 等注解使用;消息接收后的返回值处理器 HandlerMethodReturnValueHandler 配合 @SendTo 注解使用;

    2. 消息体内容转换器 MessageConverter

    3. 统一抽象的消息发送模板 AbstractMessageSendingTemplate

    4. 消息通道拦截器 ChannelInterceptor

     

    Spring Integration


    Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

    它提出了不少新的概念,包括消息路由 MessageRoute、消息分发 MessageDispatcher、消息过滤 Filter、消息转换 Transformer、消息聚合 Aggregator、消息分割 Splitter 等等。同时还提供了 MessageChannelMessageHandler 的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannelMessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

    这里为大家介绍几种消息的处理方式:
    • 消息的分割:

    • 消息的聚合:

     

    • 消息的过滤:

    • 消息的分发:

     

    接下来,我们以一个最简单的例子来尝试一下 Spring Integration:

    这段代码解释为:

     

    SubscribableChannel messageChannel =new DirectChannel(); // 1

    messageChannel.subscribe(msg-> { // 2
     System.out.println("receive: " +msg.getPayload());
    });

    messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); // 3

     

    1. 构造一个可订阅的消息通道 messageChannel

    2. 使用 MessageHandler 去消费这个消息通道里的消息;

    3. 发送一条消息到这个消息通道,消息最终被消息通道里的 MessageHandler 所消费。

    最后控制台打印出: receive: msg from alibaba

    DirectChannel 内部有个 UnicastingDispatcher 类型的消息分发器,会分发到对应的消息通道 MessageChannel 中,从名字也可以看出来,UnicastingDispatcher 是个单播的分发器,只能选择一个消息通道。那么如何选择呢? 内部提供了 LoadBalancingStrategy 负载均衡策略,默认只有轮询的实现,可以进行扩展。

    我们对上段代码做一点修改,使用多个 MessageHandler 去处理消息:

    SubscribableChannel messageChannel = new DirectChannel();

    messageChannel.subscribe(msg -> {
         System.out.println("receive1: " + msg.getPayload());
    });

    messageChannel.subscribe(msg -> {
         System.out.println("receive2: " + msg.getPayload());
    });

    messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
    messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

    由于 DirectChannel 内部的消息分发器是 UnicastingDispatcher 单播的方式,并且采用轮询的负载均衡策略,所以这里两次的消费分别对应这两个 MessageHandler。控制台打印出:

    receive1: msg from alibaba
    receive2: msg from alibaba

    既然存在单播的消息分发器 UnicastingDispatcher,必然也会存在广播的消息分发器,那就是 BroadcastingDispatcher,它被 PublishSubscribeChannel 这个消息通道所使用。广播消息分发器会把消息分发给所有的 MessageHandler

    SubscribableChannel messageChannel = new PublishSubscribeChannel();

    messageChannel.subscribe(msg -> {
         System.out.println("receive1: " + msg.getPayload());
    });

    messageChannel.subscribe(msg -> {
         System.out.println("receive2: " + msg.getPayload());
    });

    messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
    messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

    发送两个消息,都被所有的 MessageHandler 所消费。控制台打印:

    receive1: msg from alibaba
    receive2: msg from alibaba
    receive1: msg from alibaba
    receive2: msg from alibaba

     

    Spring Cloud Stream


    SCS与各模块之间的关系是:

    • SCS 在 Spring Integration 的基础上进行了封装,提出了 Binder, Binding, @EnableBinding, @StreamListener 等概念;

    • SCS 与 Spring Boot Actuator 整合,提供了 /bindings, /channels endpoint;

    • SCS 与 Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties 等外部化配置类;

    • SCS 增强了消息发送失败的和消费失败情况下的处理逻辑等功能。

    • SCS 是 Spring Integration 的加强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。它屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。

    Binder 是提供与外部消息中间件集成的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumerbindProducer ,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit BinderKafka BinderSpring Cloud Alibaba 内部已经实现了 RocketMQ Binder

    从图中可以看出,Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。我们来看一个最简单的使用 RocketMQ Binder 的例子,然后分析一下它的底层处理原理:

    • 启动类及消息的发送:

    @SpringBootApplication
    @EnableBinding({ Source.class, Sink.class }) // 1
    public class SendAndReceiveApplication {
     
        public static void main(String[] args) {
            SpringApplication.run(SendAndReceiveApplication.class, args);
        }
     
           @Bean // 2
        public CustomRunner customRunner() {
            return new CustomRunner();
        }

        public static class CustomRunner implements CommandLineRunner {

            @Autowired
            private Source source;

            @Override
            public void run(String... args) throws Exception {
                int count = 5;
                for (int index = 1; index <= count; index++) {
                    source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3
                }
            }
        }
    }

    • 消息的接收:

    @Service
    public class StreamListenerReceiveService {

        @StreamListener(Sink.INPUT) // 4
        public void receiveByStreamListener1(String receiveMsg) {
            System.out.println("receiveByStreamListener: " + receiveMsg);
        }

    }

    这段代码很简单,没有涉及到 RocketMQ 相关的代码,消息的发送和接收都是基于 SCS 体系完成的。如果想切换成 RabbitMQ 或 Kafka,只需修改配置文件即可,代码无需修改。

    我们来分析下这段代码的原理:

     

    1. @EnableBinding 对应的两个接口属性 Source 和 Sink 是 SCS 内部提供的。SCS 内部会基于 Source 和 Sink 构造 BindableProxyFactory,且对应的 output 和 input 方法返回的 MessageChannel 是 DirectChannel。output 和 input 方法修饰的注解对应的 value 是配置文件中 binding 的 name。

    public interface Source {
        String OUTPUT = "output";
        @Output(Source.OUTPUT)
        MessageChannel output();
    }
    public interface Sink {
        String INPUT = "input";
        @Input(Sink.INPUT)
        SubscribableChannel input();
    }

    配置文件里 bindings 的 name 为 output 和 input,对应 SourceSink 接口的方法上的注解里的 value:

    spring.cloud.stream.bindings.output.destination=test-topic
    spring.cloud.stream.bindings.output.content-type=text/plain
    spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group

    spring.cloud.stream.bindings.input.destination=test-topic
    spring.cloud.stream.bindings.input.content-type=text/plain
    spring.cloud.stream.bindings.input.group=test-group1

    2. 构造 CommandLineRunner,程序启动的时候会执行 CustomRunner run 方法。

    3. 调用 Source 接口里的 output 方法获取 DirectChannel,并发送消息到这个消息通道中。这里跟之前 Spring Integration 章节里的代码一致。

    • Source 里的 output 发送消息到 DirectChannel 消息通道之后会被 AbstractMessageChannelBinder#SendingHandler 这个 MessageHandler 处理,然后它会委托给 AbstractMessageChannelBinder#createProducerMessageHandler 创建的 MessageHandler 处理(该方法由不同的消息中间件实现);

    • 不同的消息中间件对应的 AbstractMessageChannelBinder#createProducerMessageHandler 方法返回的 MessageHandler 内部会把 Spring Message 转换成对应中间件的 Message 模型并发送到对应中间件的 broker;

    4. 使用 @StreamListener 进行消息的订阅。请注意,注解里的 Sink.input 对应的值是 "input",会根据配置文件里 binding 对应的 name 为 input 的值进行配置:

    • 不同的消息中间件对应的 AbstractMessageChannelBinder#createConsumerEndpoint 方法会使用 Consumer 订阅消息,订阅到消息后内部会把中间件对应的 Message 模型转换成 Spring Message;

    • 消息转换之后会把 Spring Message 发送至 name 为 input 的消息通道中;

    • @StreamListener 对应的 StreamListenerMessageHandler 订阅了 name 为 input 的消息通道,进行了消息的消费;

    这个过程文字描述有点啰嗦,用一张图总结一下(黄色部分涉及到各消息中间件的 Binder 实现以及 MQ 基本的订阅发布功能):

    SCS 章节的最后,我们来看一段 SCS 关于消息的处理方式的一段代码:

    @StreamListener(value = Sink.INPUTcondition = "headers['index']=='1'")
    public void receiveByHeader(Message msg) {
         System.out.println("receive by headers['index']=='1': " + msg);
    }

    @StreamListener(value = Sink.INPUTcondition = "headers['index']=='9999'")
    public void receivePerson(@Payload Person person) {
         System.out.println("receive Person: " + person);
    }

    @StreamListener(value = Sink.INPUT)
    public void receiveAllMsg(String msg) {
         System.out.println("receive allMsg by StreamListener. content: " + msg);
    }

    @StreamListener(value = Sink.INPUT)
    public void receiveHeaderAndMsg(@Header("index"String indexMessage msg) {
         System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
    }

    有没有发现这段代码跟 Spring MVC Controller 中接收请求的代码很像? 实际上他们的架构都是类似的,Spring MVC 对于 Controller 中参数和返回值的处理类分别是org.springframework.web.method.support.HandlerMethodArgumentResolverorg.springframework.web.method.support.HandlerMethodReturnValueHandler

    Spring Messaging 中对于参数和返回值的处理类之前也提到过,分别是 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverorg.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler

    它们的类名一模一样,甚至内部的方法名也一样。

     

    总结


    上图是 SCS 体系相关类说明的总结,关于 SCS 以及 RocketMQ Binder 更多相关的示例,可以参考 RocketMQ Binder Demos(Demos 地址:点击“阅读原文”,包含了消息的聚合、分割、过滤;消息异常处理;消息标签、SQL过滤;同步、异步消费等等。

     

springcloud 性能优化

项目上线,准备进行压力测试,自己写了个测试工具,对项目的某个接口进行压测;

服务器配置 : 阿里云 centos7  2核 8g 

运行项目关系:

  接口---》zuul 处理--》consumer项目 --》实际处理业务项目

通过测试工具压力测试,发现只要上20就会提示服务器异常,发现有些默认值是20,所以把自己的一些处理方法整理一下,作为以后借鉴。

 

1,zuul 优化配置如下:

ribbon:
  ReadTimeout: 15000
  ConnectTimeout: 15000
  MaxAutoRetries: 0 ##最大允许重试次数
  MaxAutoRetriesNextServer: 1 ##切换重试实例个数
zuul:
  host:
    connect-timeout-millis: 20000
    socket-timeout-millis: 20000
    max-per-route-connections: 500
    max-total-connections: 2000
    ##信号量默认100,此处不设置,只要并发达到100就会提示无法获取semaphonre
  semaphore:
    max-semaphores: 2000

2. consumer项目,此项目主要是处理与业务相关的接口,合成接口。所以使用了hystrix,ribbion

###############
feign:
  hystrix:
    enabled: true
  httpclient:
    enabled: true
    max-connetctions: 2000
    max-connections-per-route: 500
  okhttp:
    enabled: false
ribbon:
  ReadTimeout: 15000
  ConnectTimeout: 15000
  MaxAutoRetries: 0
  MaxAutoRetriesNextServer: 1
hystrix:
  threadpool:
    SERVICEMEMBER:
      coreSize: 500
      maximumSize: 1000
  command:
    SERVICEMEMBER: ##自己定义的服务名,单独设置参数
      fallback:
        ### 是否启用降级服务。默认true
        enabled: false

      ## 是否使用断路器来跟踪其健康指标和熔断请求,默认true
      circuitBreaker:
        enabled: true
        ##一个统计窗口内熔断触发的最小个数/10s
        requestVolumeThreshold: 500
        ##熔断多少秒后去尝试请求
        sleepWindowInMilliseconds: 5000
        ###失败率达到多少百分比后熔断
        errorThresholdPercentage: 80
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 20000
          strategy: THREAD
    default: ##默认其他服务使用参数
      fallback:
        ### 是否启用降级服务。默认true
        enabled: false

        ## 是否使用断路器来跟踪其健康指标和熔断请求,默认true
      circuitBreaker:
        enabled: true
        ##一个统计窗口内熔断触发的最小个数/10s
        requestVolumeThreshold: 2000
        ##熔断多少秒后去尝试请求
        sleepWindowInMilliseconds: 5000
        ###失败率达到多少百分比后熔断
        errorThresholdPercentage: 80

      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 20000
          strategy: THREAD

 

以上,记录下来。

 

 

 

 

 

 

springcloud config配置中心 Cannot clone or checkout repository: git@git.example.com/xxx/xxx.git

出现此问题的原因有可能是

  1,私钥,公钥问题

比如,我遇到的就是之前私钥是以前生成的OPENSSH 私钥,与现在要求生成RSA 私钥公钥不符

所以,请认准私钥头部的 -----BEGIN RSA PRIVATE KEY-----

 

 2,gitlab仓库,未配置公钥

  可以在  gitlab 右上角,点开头像,settings --左侧栏 ssh-keys 

  把在服务器生成的公钥 id_rsa.pub 如果你另外生成的可能是xxx.pub复制过来

 

3,配置文件私钥配置

  一定要注意格式

  

以上为生产服务器配置使用,

 

另外,生成私钥命令为

ssh-keygen -t rsa -C "你的邮箱" 

 

如果已经存在id_rsa 可以指定另外一个

ssh-keygen -t rsa -C "你的邮箱" -f git

 

springcloud turbine 集群配置

1, 创建程序,修改pom.xml

   

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-turbine</artifactId>
        </dependency>

 

2,启动类

    

@EnableTurbine
@SpringBootApplication
@EnableCircuitBreaker
@EnableDiscoveryClient
@EnableHystrixDashboard
public class ShineHystrixApplication {
     
    
    public static void main(String[] args) {
        SpringApplication.run(ShineHystrixApplication.class, args);
        System.out.print("++++++++++++++++++++++++熔断器中心启动成功!!++++++++++++++++++++++++");

    }
}

 

3,配置文件

  

server:
  port: 8768
spring:
  application:
    name: server-hystrix
  security:
    user:
      name: admin
      password: shine
eureka:
  client:
    service-url:
      defaultZone: http://${spring.security.user.name}:${spring.security.user.password}@localhost:8761/eureka/
    fetch-registry: true
    register-with-eureka: true
turbine:
  cluster-name-expression: new String("default")
  combine-host-port: true
##要监控的服务,各服务以,分隔 app-config: PlaceSerProFegin,ContentSerProFegin,MemberSerProFegin management: endpoints: web: exposure: include: hystrix.stream

 

 

 

 

springcloud eureka 开启密码配置,其他应用注册时,提示403

当其他应用注册eureka时,提示403,如下

failure with status code 403; retrying on another server if available

 

原因:

 springcloud2.0以后,密码校验开启了crf。所以把crf关闭即可。

增加配置类

@EnableWebSecurity
public class WebSecurityConfigurer extends WebSecurityConfigurerAdapter {
    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http.csrf().disable();
        super.configure(http);
    }
}

 

重启应用即可。

springcloud config 属性文件中文乱码 有更新!

问题,springcloud config-server服务配置git 存储文件中有包含中文,读取属性文件以后,出现乱码!!

git仓库中的属性文件名 locale_cn.properties

如下所示

shine.login.test=测试项目
shine.login.abc=终于来个Ok


请求 http://localhost:8769/locale/cn

出现乱码。




解决:

1,加载属性文件类,重写

OriginTrackedPropertiesLoader类。

2,自己定义加载属性文件

我采用,自己定义加载属性文件

具体如下:

  

public class MyPropertiesHandler implements PropertySourceLoader {
    @Override
    public String[] getFileExtensions() {
        return new String[]{"properties", "xml"};
    }

    @Override
    public List<PropertySource<?>> load(String name, Resource resource) throws IOException {
        Map<String, ?> properties = this.loadProperties(resource);
        return properties.isEmpty() ? Collections.emptyList() : Collections.singletonList(new OriginTrackedMapPropertySource(name, properties));
    }

    private Map<String, ?> loadProperties(Resource resource) throws IOException {
        String filename = resource.getFilename();
        return (Map) (filename != null && filename.endsWith(".xml") ? PropertiesLoaderUtils.loadProperties(resource) : loadPro(resource));
    }

    
    private Map<String, ?> loadPro(Resource resource) {
        Map<String, String> proMap = new HashMap<>();
        try {
            InputStream inputStream = resource.getInputStream();
            List<Byte> byteList = new LinkedList<Byte>();
            byte[] readByte = new byte[1024];
            int length;
            while ((length = inputStream.read(readByte)) > 0) {
                for (int i = 0; i < length; i++) {
                    byteList.add(readByte[i]);
                }
            }
            byte[] allBytes = new byte[byteList.size()];
            int index = 0;
            for (Byte soloByte : byteList) {
                allBytes[index] = soloByte;
                index += 1;
            }
            String str = new String(allBytes, "UTF-8");
            String[] strs = StringUtils.splitByWholeSeparator(str, "\n");
            for (String tmp_str : strs) {
                if (StringUtils.isNotBlank(tmp_str)) {
                    String[] tmpstr = StringUtils.splitByWholeSeparator(tmp_str, "=");
                    proMap.put(tmpstr[0], tmpstr[1]);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return proMap;

    }
}

 

同时,增加配置在 src/resources目录下增加文件夹

META-INF/spring.factories

添加一行代码 

org.springframework.boot.env.PropertySourceLoader=com.shine.configserver.MyPropertiesHandler

 

重启应用,再刷新 http://localhost:8769/locale/cn 即可看到正确的

 

 

 

 

 

 

 

 

 


 

 
















springcloud feign 多文件上传 有更新!

网上也有不同的上传,但是对于多文件上传的解决方案比较少,参考了https://blog.csdn.net/qq_32953079/article/details/8163081

1,导入引用 的文件 

   

<dependency>
            <groupId>io.github.openfeign.form</groupId>
            <artifactId>feign-form</artifactId>
        </dependency>
        <dependency>
            <groupId>io.github.openfeign.form</groupId>
            <artifactId>feign-form-spring</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-fileupload</groupId>
            <artifactId>commons-fileupload</artifactId>
        </dependency>

 

2,定义上传接口

 

@FeignClient(value = "PLACESERVICEPROVIDER", configuration = FeignMultipartSupportConfig.class)
public interface FileService {

 @PostMapping(value = "/file/uploadMulti", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResult uploadMulti(@RequestPart(value = "files") MultipartFile[] files) throws ServiceException;
}

@Configuration
public class FeignMultipartSupportConfig {
    @Autowired
    private ObjectFactory<HttpMessageConverters> messageConverters;

    @Bean
    public Encoder feignEncoder() {
        return new SpringMultipartEncoder(new SpringEncoder(messageConverters));
    }
}

public class SpringMultipartEncoder extends FormEncoder {

    public SpringMultipartEncoder() {
        this(new Default());
    }


    /**
     * Constructor with specified delegate encoder.
     *
     * @param delegate delegate encoder, if this encoder couldn't encode object.
     */
    public SpringMultipartEncoder(Encoder delegate) {
        super(delegate);

        MultipartFormContentProcessor processor = (MultipartFormContentProcessor) getContentProcessor(ContentType.MULTIPART);
        processor.addWriter(new SpringSingleMultipartFileWriter());
        processor.addWriter(new SpringManyMultipartFilesWriter());
    }


    @Override
    public void encode(Object object, Type bodyType, RequestTemplate template) throws EncodeException {
        // 单MultipartFile判断
        if (bodyType.equals(MultipartFile.class)) {
            MultipartFile file = (MultipartFile) object;
            Map data = Collections.singletonMap(file.getName(), object);
            super.encode(data, MAP_STRING_WILDCARD, template);
            return;
        } else if (bodyType.equals(MultipartFile[].class)) {
            // MultipartFile数组处理
            MultipartFile[] file = (MultipartFile[]) object;
            if (file != null) {
                Map data = Collections.singletonMap(file.length == 0 ? "" : file[0].getName(), object);
                super.encode(data, MAP_STRING_WILDCARD, template);
                return;
            }
        }
        // 其他类型调用父类默认处理方法
        super.encode(object, bodyType, template);

    }
}


3,服务端实现接口

  

@RestController
public class FileServiceImpl implements FileService {
   @Override
    public BaseResult uploadMulti(MultipartFile[] files) throws ServiceException{
  //do Something
}

}

 

 

 一般默认的上传文件大小,会限制1m大小,可以增加配置

具体设置参见 这篇文章的设置

 

 

 

 

 

springcloud Feign @RequestBody 有更新!

1,API工程接口定义

   

@FeginClient(vlaue="memberprovider",fallback=LoginServiceFeginHystrix.class)
public interface LoginService{

  /***
     * 注册新用户
     * */
    @PostMapping(value = "/login/reg")
    public MemberDTO regMember(@RequestBody MemberVo memberVo) throws BusinessException, ServiceException;

}
    熔断服务

  @Slf4j

@Component
public class LoginServiceFeginHystrix implements LoginService {

   @Override
    public MemberDTO regMember(MemberVo memberVo) throws BusinessException, ServiceException {
        log.error("====<<<<进入熔断服务..{}", "LoginServiceFeginHystrix.regMember()");
        return null;
    }
}

}

  

2, 服务端工程实现

   

 @Override
 public MemberDTO regMember(@RequestBody MemberVo memberVo) throws BusinessException, ServiceException {
 //do somethings
}

 

3,消费端调用

 

 @PostMapping("/login/reg")
    public BaseResult reg(MemberVo memberVo) throws BusinessException, ServiceException {
 MemberDTO memberDTO = loginService.regMember(memberVo);
......
}

 

要注意要点:

  服务端实现一般使用工具生成,没有带 @RequestBody

 另外一个,注意接口定义的包名,与消费端包扫描的路径,确认消费端是否会把接口定义的路径扫描进去。

如,接口路径 com.xuahua.api.common.xx

 而消费端的启动类在 com.xuahua.consumer.xx 这样就会扫不到,需要手动添加@ComponetScan(basePackages="")

 缺少这个导致请求过来的值为空。

 

 

 

springcloud fegin 支持文件上传 有更新!

1,API 工程

   定义上传文件接口

   

@FeignClient(value = "PLACESERVICEPROVIDER", configuration = FileService.MutilSupportConfig.class)
public interface FileService {

    @PostMapping(value = "/file/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResult upload(@RequestPart(value = "file") MultipartFile file) throws ServiceException;

@Configuration
class MutilSupportConfig {

        @Autowired
ObjectFactory<HttpMessageConverters> messageConvertos;

@Bean
public Encoder feignFormEncoder() {
            return new SpringFormEncoder(new SpringEncoder(messageConvertos));
}
    }
}




 

2,服务方工程

  具体实现如下

  

@Service
@RestController
public class FileServiceImpl implements FileService {
    @Override
    public BaseResult upload(MultipartFile file) throws ServiceException {
        log.info("====<<<<进入上传处理接口");
        if (file == null) {
            return BaseResultGenerator.failure("上传文件file不能为空");
        }
        return BaseResultGenerator.success(file.getOriginalFilename());
    }
}

  

 

3,消费方工程

  具体实现如下

 

@Slf4j
@RestController
public class FileController {

    @Autowired
    FileService fileService;

    @PostMapping(value = "/file/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResult upload(MultipartFile file) throws ServiceException {
        log.debug("=============>进入上传接口{}", file.getName());
        return fileService.upload(file);
    }
}



 

4. 要引入的包

 <dependency>
                <groupId>io.github.openfeign.form</groupId>
                <artifactId>feign-form-spring</artifactId>
                <version>3.2.2</version>
            </dependency>
            <dependency>
                <groupId>io.github.openfeign.form</groupId>
                <artifactId>feign-form</artifactId>
                <version>3.2.2</version>
            </dependency>

 

 注意,对于上传大文件,超过1M时,会提示文件过大。

在配置文件中需要设置

spring.servlet.multipart.enabled=true

spring.servlet.multipart.max-file-size=10MB

spring.servlet.multipart.max-request-size=10MB

 

原来1.5.9配置为

spring.http.multipart.enabled=true

spring.http.multipart.max-file-size=10MB

spring.http.multipart.max-request-size=10MB

 

在使用Fegin上传时,不光要处理zuul的文件大小,文件超时问题,还要注意也要修改服务方的文件大小以及超时。


        

公告

喧哗博客--繁华过后的沉寂--技术经验分享^-^
Copyright (c) 2009-2019, b3log.org & hacpai.com