Solo  当前访客:4 登录 注册

喧哗博客-http://blog.xuahua.com

繁华过后的沉寂--技术经验分享
浏览次数: 94,390    文章总数: 91    评论总数: 3

【转载】spring cloud stream

转载自:https://mp.weixin.qq.com/s/QD8kmtv-feI95QI-vVcf5A

Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。

 

Spring Cloud Stream (后面以 SCS 代替 Spring Cloud Stream) 本身内容很多,而且它还有很多外部的依赖,想要熟悉 SCS,必须要先了解 Spring Messaging 和 Spring Integration 这两个项目,接下来,文章将从围绕以下三点进行展开:

 

  • 什么是 Spring Messaging;

  • 什么是 Spring Integration;

  • 什么是 SCS 体系及其原理;

    Spring Messaging


    Spring Messaging 是 Spring Framework 中的一个模块,其作用就是统一消息的编程模型。

    • 比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header:

     

    package org.springframework.messaging;
    public interface Message<T> {
        T getPayload();
        MessageHeaders getHeaders();
    }

    • 消息通道 MessageChannel 用于接收消息,调用 send 方法可以将消息发送至该消息通道中 :

    @FunctionalInterface
    public interface MessageChannel {
        long INDEFINITE_TIMEOUT = -1;
        default boolean send(Message<?> message) {

             return send(message, INDEFINITE_TIMEOUT);

         }
         boolean send(Message<?> message, long timeout);
    }

    消息通道里的消息如何被消费呢? 
    • 由消息通道的子接口可订阅的消息通道 SubscribableChannel 实现,被 MessageHandler 消息处理器所订阅:

    public interface SubscribableChannel extends MessageChannel {
        boolean subscribe(MessageHandler handler);
        boolean unsubscribe(MessageHandler handler);
    }

    • MessageHandler 真正地消费/处理消息:

    @FunctionalInterface
    public interface MessageHandler {
        void handleMessage(Message<?> message) throws MessagingException;
    }

    Spring Messaging 内部在消息模型的基础上衍生出了其它的一些功能,如:

    1. 消息接收参数及返回值处理:消息接收参数处理器 HandlerMethodArgumentResolver 配合 @Header, @Payload 等注解使用;消息接收后的返回值处理器 HandlerMethodReturnValueHandler 配合 @SendTo 注解使用;

    2. 消息体内容转换器 MessageConverter

    3. 统一抽象的消息发送模板 AbstractMessageSendingTemplate

    4. 消息通道拦截器 ChannelInterceptor

     

    Spring Integration


    Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

    它提出了不少新的概念,包括消息路由 MessageRoute、消息分发 MessageDispatcher、消息过滤 Filter、消息转换 Transformer、消息聚合 Aggregator、消息分割 Splitter 等等。同时还提供了 MessageChannelMessageHandler 的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannelMessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

    这里为大家介绍几种消息的处理方式:
    • 消息的分割:

    • 消息的聚合:

     

    • 消息的过滤:

    • 消息的分发:

     

    接下来,我们以一个最简单的例子来尝试一下 Spring Integration:

    这段代码解释为:

     

    SubscribableChannel messageChannel =new DirectChannel(); // 1

    messageChannel.subscribe(msg-> { // 2
     System.out.println("receive: " +msg.getPayload());
    });

    messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); // 3

     

    1. 构造一个可订阅的消息通道 messageChannel

    2. 使用 MessageHandler 去消费这个消息通道里的消息;

    3. 发送一条消息到这个消息通道,消息最终被消息通道里的 MessageHandler 所消费。

    最后控制台打印出: receive: msg from alibaba

    DirectChannel 内部有个 UnicastingDispatcher 类型的消息分发器,会分发到对应的消息通道 MessageChannel 中,从名字也可以看出来,UnicastingDispatcher 是个单播的分发器,只能选择一个消息通道。那么如何选择呢? 内部提供了 LoadBalancingStrategy 负载均衡策略,默认只有轮询的实现,可以进行扩展。

    我们对上段代码做一点修改,使用多个 MessageHandler 去处理消息:

    SubscribableChannel messageChannel = new DirectChannel();

    messageChannel.subscribe(msg -> {
         System.out.println("receive1: " + msg.getPayload());
    });

    messageChannel.subscribe(msg -> {
         System.out.println("receive2: " + msg.getPayload());
    });

    messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
    messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

    由于 DirectChannel 内部的消息分发器是 UnicastingDispatcher 单播的方式,并且采用轮询的负载均衡策略,所以这里两次的消费分别对应这两个 MessageHandler。控制台打印出:

    receive1: msg from alibaba
    receive2: msg from alibaba

    既然存在单播的消息分发器 UnicastingDispatcher,必然也会存在广播的消息分发器,那就是 BroadcastingDispatcher,它被 PublishSubscribeChannel 这个消息通道所使用。广播消息分发器会把消息分发给所有的 MessageHandler

    SubscribableChannel messageChannel = new PublishSubscribeChannel();

    messageChannel.subscribe(msg -> {
         System.out.println("receive1: " + msg.getPayload());
    });

    messageChannel.subscribe(msg -> {
         System.out.println("receive2: " + msg.getPayload());
    });

    messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
    messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

    发送两个消息,都被所有的 MessageHandler 所消费。控制台打印:

    receive1: msg from alibaba
    receive2: msg from alibaba
    receive1: msg from alibaba
    receive2: msg from alibaba

     

    Spring Cloud Stream


    SCS与各模块之间的关系是:

    • SCS 在 Spring Integration 的基础上进行了封装,提出了 Binder, Binding, @EnableBinding, @StreamListener 等概念;

    • SCS 与 Spring Boot Actuator 整合,提供了 /bindings, /channels endpoint;

    • SCS 与 Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties 等外部化配置类;

    • SCS 增强了消息发送失败的和消费失败情况下的处理逻辑等功能。

    • SCS 是 Spring Integration 的加强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。它屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。

    Binder 是提供与外部消息中间件集成的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumerbindProducer ,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit BinderKafka BinderSpring Cloud Alibaba 内部已经实现了 RocketMQ Binder

    从图中可以看出,Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。我们来看一个最简单的使用 RocketMQ Binder 的例子,然后分析一下它的底层处理原理:

    • 启动类及消息的发送:

    @SpringBootApplication
    @EnableBinding({ Source.class, Sink.class }) // 1
    public class SendAndReceiveApplication {
     
        public static void main(String[] args) {
            SpringApplication.run(SendAndReceiveApplication.class, args);
        }
     
           @Bean // 2
        public CustomRunner customRunner() {
            return new CustomRunner();
        }

        public static class CustomRunner implements CommandLineRunner {

            @Autowired
            private Source source;

            @Override
            public void run(String... args) throws Exception {
                int count = 5;
                for (int index = 1; index <= count; index++) {
                    source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3
                }
            }
        }
    }

    • 消息的接收:

    @Service
    public class StreamListenerReceiveService {

        @StreamListener(Sink.INPUT) // 4
        public void receiveByStreamListener1(String receiveMsg) {
            System.out.println("receiveByStreamListener: " + receiveMsg);
        }

    }

    这段代码很简单,没有涉及到 RocketMQ 相关的代码,消息的发送和接收都是基于 SCS 体系完成的。如果想切换成 RabbitMQ 或 Kafka,只需修改配置文件即可,代码无需修改。

    我们来分析下这段代码的原理:

     

    1. @EnableBinding 对应的两个接口属性 Source 和 Sink 是 SCS 内部提供的。SCS 内部会基于 Source 和 Sink 构造 BindableProxyFactory,且对应的 output 和 input 方法返回的 MessageChannel 是 DirectChannel。output 和 input 方法修饰的注解对应的 value 是配置文件中 binding 的 name。

    public interface Source {
        String OUTPUT = "output";
        @Output(Source.OUTPUT)
        MessageChannel output();
    }
    public interface Sink {
        String INPUT = "input";
        @Input(Sink.INPUT)
        SubscribableChannel input();
    }

    配置文件里 bindings 的 name 为 output 和 input,对应 SourceSink 接口的方法上的注解里的 value:

    spring.cloud.stream.bindings.output.destination=test-topic
    spring.cloud.stream.bindings.output.content-type=text/plain
    spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group

    spring.cloud.stream.bindings.input.destination=test-topic
    spring.cloud.stream.bindings.input.content-type=text/plain
    spring.cloud.stream.bindings.input.group=test-group1

    2. 构造 CommandLineRunner,程序启动的时候会执行 CustomRunner run 方法。

    3. 调用 Source 接口里的 output 方法获取 DirectChannel,并发送消息到这个消息通道中。这里跟之前 Spring Integration 章节里的代码一致。

    • Source 里的 output 发送消息到 DirectChannel 消息通道之后会被 AbstractMessageChannelBinder#SendingHandler 这个 MessageHandler 处理,然后它会委托给 AbstractMessageChannelBinder#createProducerMessageHandler 创建的 MessageHandler 处理(该方法由不同的消息中间件实现);

    • 不同的消息中间件对应的 AbstractMessageChannelBinder#createProducerMessageHandler 方法返回的 MessageHandler 内部会把 Spring Message 转换成对应中间件的 Message 模型并发送到对应中间件的 broker;

    4. 使用 @StreamListener 进行消息的订阅。请注意,注解里的 Sink.input 对应的值是 "input",会根据配置文件里 binding 对应的 name 为 input 的值进行配置:

    • 不同的消息中间件对应的 AbstractMessageChannelBinder#createConsumerEndpoint 方法会使用 Consumer 订阅消息,订阅到消息后内部会把中间件对应的 Message 模型转换成 Spring Message;

    • 消息转换之后会把 Spring Message 发送至 name 为 input 的消息通道中;

    • @StreamListener 对应的 StreamListenerMessageHandler 订阅了 name 为 input 的消息通道,进行了消息的消费;

    这个过程文字描述有点啰嗦,用一张图总结一下(黄色部分涉及到各消息中间件的 Binder 实现以及 MQ 基本的订阅发布功能):

    SCS 章节的最后,我们来看一段 SCS 关于消息的处理方式的一段代码:

    @StreamListener(value = Sink.INPUTcondition = "headers['index']=='1'")
    public void receiveByHeader(Message msg) {
         System.out.println("receive by headers['index']=='1': " + msg);
    }

    @StreamListener(value = Sink.INPUTcondition = "headers['index']=='9999'")
    public void receivePerson(@Payload Person person) {
         System.out.println("receive Person: " + person);
    }

    @StreamListener(value = Sink.INPUT)
    public void receiveAllMsg(String msg) {
         System.out.println("receive allMsg by StreamListener. content: " + msg);
    }

    @StreamListener(value = Sink.INPUT)
    public void receiveHeaderAndMsg(@Header("index"String indexMessage msg) {
         System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
    }

    有没有发现这段代码跟 Spring MVC Controller 中接收请求的代码很像? 实际上他们的架构都是类似的,Spring MVC 对于 Controller 中参数和返回值的处理类分别是org.springframework.web.method.support.HandlerMethodArgumentResolverorg.springframework.web.method.support.HandlerMethodReturnValueHandler

    Spring Messaging 中对于参数和返回值的处理类之前也提到过,分别是 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverorg.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler

    它们的类名一模一样,甚至内部的方法名也一样。

     

    总结


    上图是 SCS 体系相关类说明的总结,关于 SCS 以及 RocketMQ Binder 更多相关的示例,可以参考 RocketMQ Binder Demos(Demos 地址:点击“阅读原文”,包含了消息的聚合、分割、过滤;消息异常处理;消息标签、SQL过滤;同步、异步消费等等。

     

Netty Webscoket 实现聊天室 有更新!

一。服务器端

1,服务器应用重启,初始化,绑定端口

public interface Server {

    /**
     * 初始化服务器
     */
    void init();

    /**
     * 启动服务器
     */
    void start();

    /**
     * 关闭服务器
     */
    void shutdown();
}

 

 

@Component
public class ChatServer implements Server {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private DefaultEventLoopGroup defaultGroup;

// Boss线程:由这个线程池提供的线程是boss种类的,用于创建、连接、绑定socket, (有点像门卫)然后把这些socket传给worker线程池。
// 在服务器端每个监听的socket都有一个boss线程来处理。在客户端,只有一个boss线程来处理所有的socket。

 private NioEventLoopGroup bossGroup;

// Worker线程:Worker线程执行所有的异步I/O,即处理操作

 private NioEventLoopGroup workGroup;


 private ChannelFuture future;

 private ServerBootstrap bootstrap;

 
 private ScheduledExecutorService executorService;

@Value("${HOST}")
 private String host;

@Value("${PORT}")
 private int port;

@Autowired
private UserAuthHandler authHandler;

@Autowired
private MessageHandler messageHandler;

@Autowired
private ChannelManager manager;

@PostConstruct
 @Override
public void init() {
 logger.info("server init");
 int cpus = Runtime.getRuntime().availableProcessors();


defaultGroup = new DefaultEventLoopGroup(8, new ThreadFactory() {
 private AtomicInteger index = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
 return new Thread(r, "DEFAULTGROUP" + index.incrementAndGet());
}
 });

bossGroup = new NioEventLoopGroup(cpus, new ThreadFactory() {
 private AtomicInteger index = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
 return new Thread(r, "BOSSGROUP" + index.incrementAndGet());
}
 });

workGroup = new NioEventLoopGroup(cpus * 10, new ThreadFactory() {
 private AtomicInteger index = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
 return new Thread(r, "WORKGROUP" + index.incrementAndGet());
}
 });

bootstrap = new ServerBootstrap();

executorService = Executors.newScheduledThreadPool(2);

}

 @Override
public void start() {
 logger.info("server start");
bootstrap.group(bossGroup, workGroup)
 .channel(NioServerSocketChannel.class)
 //TCP测链路检测
.option(ChannelOption.SO_KEEPALIVE, true)
 //禁止使用Nagle算法
.option(ChannelOption.TCP_NODELAY, true)
 //初始化服务端可连接队列大小
.option(ChannelOption.SO_BACKLOG, 1024)
 .localAddress(new InetSocketAddress(port))
 .childHandler(new ChannelInitializer<SocketChannel>() {

 @Override
protected void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast(defaultGroup,
//请求解码器
new HttpServerCodec(),
//将多个消息转换成单一的消息对象
new HttpObjectAggregator(65536),
//支持异步发送大的码流
new ChunkedWriteHandler(),
//定时检测链路是否读空闲
new IdleStateHandler(60, 0, 0),
//认证Handler
authHandler,
//消息Handler
messageHandler

);
}
 });

 try {
 future = bootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) future.channel().localAddress();
logger.info("QuarkChat start success ,host is :" + addr.getHostName() + ",port is:" + addr.getPort());

/**
 * 定时扫描Channel 关闭失效的Channel
 */
executorService.scheduleAtFixedRate(new Runnable() {
 @Override
public void run() {
 logger.info("scheduleAtFixedRate to close channel");
manager.scanNotActiveChannel();
}
 //initialDelay:延迟180秒执行,period:任务执行的间隔周期
}, 180, 180, TimeUnit.SECONDS);

/**
 * 定时向客户端发送Ping进行心跳检测
 */
executorService.scheduleAtFixedRate(new Runnable() {
 @Override
public void run() {
 logger.info("scheduleAtFixedRate to ping");
manager.broadPing();
}
 }, 60, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
 logger.error("Quark Chat fail ", e);
Thread.currentThread().interrupt();
}
 }

 @PreDestroy
 @Override
public void shutdown() {
 if (defaultGroup != null) {
 defaultGroup.shutdownGracefully();
}
 if (executorService != null) {
 executorService.shutdown();
}
 bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}

}

 

 

 

二。消息体参数

public class ClientProtocol{

/**
自己定义文件体头,也可以不用
*/
private int MAGIC;

/**
消息体请求,响应编码
如 0x01 ping 消息请求标识
   0x02 pong 服务器响应ping请求
   0x03 auth 用户验证请求标识
   0x04 authresp 服务器响应用户验证响应
   0x05 message 用户发送消息请求标识
   0x06 messageresp 服务器响应用户发送消息响应
   0x07 reconn 用户重连请求标识
   0x08 reconnresp 服务器响应重连请求响应
   0x10 sysmessage 系统发送消息标识
   0x11 useroff 用户离线通知标识
   0x12 useroffresp 系统通知用户
 
*/
private byte type;

/**聊天室id*/
private int roomid ;
/***
发送的消息内容
*/
private msg;
/**目标对象**/
private int target;
}

 

三。消息处理

 

 

@ChannelHandler.Sharable
@Scope("prototype")
@Component
public class UserAuthHandler extends SimpleChannelInboundHandler {

    private static final Logger logger = LoggerFactory.getLogger(UserAuthHandler.class);

    private WebSocketServerHandshaker handshaker;

    @Value("${WEBSOCKET_URL}")
    private String WEBSOCKET_URL;

    @Autowired
    private ChannelManager manager;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //Http请求(第一次握手)
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        }
        //处理WebSocket请求
        else if (msg instanceof WebSocketFrame) {
            handleWebSocket(ctx, (WebSocketFrame) msg);
        }
    }

    /**
     * 内部链路检测
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //当通道空闲时由IdleStateHandler触发的用户事件
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            // 判断Channel是否读空闲, 读空闲时移除Channel
            if (event.state().equals(IdleState.READER_IDLE)) {
                final String address = NettyUtil.parseChannelRemoteAddr(ctx.channel());
                logger.warn("Netty Server UserAuthHandler: IDLE exception :{}", address);

            }
        }
    }

    /**
     * HTTP握手反馈
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        //判断是否是WebSocket协议
        if (!request.decoderResult().isSuccess() || !"websocket".equals(request.headers().get("Upgrade"))) {
            logger.warn("protobuf don't support WebSocket");
            ctx.channel().close();
            return;
        }
        WebSocketServerHandshakerFactory handshakerFactory = new WebSocketServerHandshakerFactory(
                WEBSOCKET_URL, null, true);
        handshaker = handshakerFactory.newHandshaker(request);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            // 动态加入websocket的编解码处理
            handshaker.handshake(ctx.channel(), request);
            // 存储已经连接的Channel
            manager.addChannel(ctx.channel());
        }
    }

    /**
     * WebSocket反馈
     *
     * @param ctx
     * @param frame
     */
    private void handleWebSocket(ChannelHandlerContext ctx, WebSocketFrame frame) {
        //判断是否是关闭链路的命令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            manager.removeChannel(ctx.channel());
            logger.info("Have a WebSocket Channel Close");
            return;
        }

        //判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            logger.info("ping message : ", frame.content().retain());
            ctx.writeAndFlush(new PingWebSocketFrame(frame.content().retain()));
            return;
        }

        //判断是否是Pong消息
        if (frame instanceof PongWebSocketFrame) {
            logger.info("pong message :", frame.content().retain());
            ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
        }

        //仅支持文本消息
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(frame.getClass().getName() + "frame type not supported!!!");
        }
        String message = ((TextWebSocketFrame) frame).text();
       ClientProtocol clientProto = JSON.parseObject(message, new TypeReference<ClientProtocol>() {
        });
     
        byte type = clientProto.getType();
        Channel channel = ctx.channel();
        int roomid = clientProto.getRoomid();
        if (clientProto.getMAGIC() != 
            ChatProtocol.getMAGIC()) {
            return;//过滤格式不正确的消息
        }
        switch (type) {
          
            default:
              return;
        }

        //MessageHandler处理
        ctx.fireChannelRead(frame.retain());
    }

 

 

微信公众号分享接口 权限校验填坑 有更新!

微信公众号分享,使用 http://res2.wx.qq.com/open/js/jweixin-1.4.0.js  调用jssdk的时候,权限校验提示config:fail 或者无效签名。

总结以上可能会出错的原因

1, 在公众号后台,功能设置-业务域名要设成 m.xxx.com JS域名也要设成 m.xxx.com

2,通过 appId,appSecret 获取 access_token,此access_token要在服务器缓存.

地址 

https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=APPID&secret=APPSECRET

3,通过 access_token 获取jsapi_ticket ,此jsapi_ticket 要在服务器缓存。

地址 

https://api.weixin.qq.com/cgi-bin/ticket/getticket?access_token=ACCESS_TOKEN&type=jsapi

4,分享接口校验签名,签名顺序 

"jsapi_ticket=" + jsapiticket + "&noncestr=" + noncestr + "&timestamp=" + timestamp + "&url=" + url

注意此处为不写 nonstr
而在jssdk中权限签名



wx.config({
    debug: true, // 开启调试模式,调用的所有api的返回值会在客户端alert出来,若要查看传入的参数,可以在pc端打开,参数信息会通过log打出,仅在pc端时才会打印。
    appId: '', // 必填,公众号的唯一标识
    timestamp: , // 必填,生成签名的时间戳
    nonceStr: '', // 必填,生成签名的随机串
    signature: '',// 必填,签名
    jsApiList: [] // 必填,需要使用的JS接口列表
});

注意红色,粗体大小写。

原因:

当前要分享的页面地址,如果是从其他微信分享过来的,地址栏上面会带有二个参数 from=groupmessage&isappinstalled=0 这样的二个参数。

如果你把这二个参数带上去签名,那么肯定会提示无效签名。

如果你只是截取地址,参与签名也会提示无效签名。

正确做法,再做一次转发,主动把微信后面带的参数拿掉为止。

 

springcloud 性能优化

项目上线,准备进行压力测试,自己写了个测试工具,对项目的某个接口进行压测;

服务器配置 : 阿里云 centos7  2核 8g 

运行项目关系:

  接口---》zuul 处理--》consumer项目 --》实际处理业务项目

通过测试工具压力测试,发现只要上20就会提示服务器异常,发现有些默认值是20,所以把自己的一些处理方法整理一下,作为以后借鉴。

 

1,zuul 优化配置如下:

ribbon:
  ReadTimeout: 15000
  ConnectTimeout: 15000
  MaxAutoRetries: 0 ##最大允许重试次数
  MaxAutoRetriesNextServer: 1 ##切换重试实例个数
zuul:
  host:
    connect-timeout-millis: 20000
    socket-timeout-millis: 20000
    max-per-route-connections: 500
    max-total-connections: 2000
    ##信号量默认100,此处不设置,只要并发达到100就会提示无法获取semaphonre
  semaphore:
    max-semaphores: 2000

2. consumer项目,此项目主要是处理与业务相关的接口,合成接口。所以使用了hystrix,ribbion

###############
feign:
  hystrix:
    enabled: true
  httpclient:
    enabled: true
    max-connetctions: 2000
    max-connections-per-route: 500
  okhttp:
    enabled: false
ribbon:
  ReadTimeout: 15000
  ConnectTimeout: 15000
  MaxAutoRetries: 0
  MaxAutoRetriesNextServer: 1
hystrix:
  threadpool:
    SERVICEMEMBER:
      coreSize: 500
      maximumSize: 1000
  command:
    SERVICEMEMBER: ##自己定义的服务名,单独设置参数
      fallback:
        ### 是否启用降级服务。默认true
        enabled: false

      ## 是否使用断路器来跟踪其健康指标和熔断请求,默认true
      circuitBreaker:
        enabled: true
        ##一个统计窗口内熔断触发的最小个数/10s
        requestVolumeThreshold: 500
        ##熔断多少秒后去尝试请求
        sleepWindowInMilliseconds: 5000
        ###失败率达到多少百分比后熔断
        errorThresholdPercentage: 80
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 20000
          strategy: THREAD
    default: ##默认其他服务使用参数
      fallback:
        ### 是否启用降级服务。默认true
        enabled: false

        ## 是否使用断路器来跟踪其健康指标和熔断请求,默认true
      circuitBreaker:
        enabled: true
        ##一个统计窗口内熔断触发的最小个数/10s
        requestVolumeThreshold: 2000
        ##熔断多少秒后去尝试请求
        sleepWindowInMilliseconds: 5000
        ###失败率达到多少百分比后熔断
        errorThresholdPercentage: 80

      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 20000
          strategy: THREAD

 

以上,记录下来。

 

 

 

 

 

 

visual VM 监听远程内存 提示无法使用 service:jmx:rmi:///jndi/rmi:///jmxrmi

1,启动参数

  

java -Djava.rmi.server.hostname=xxx -Djava.security.policy=jstatd.all.policy -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=8888
-Xmx256m -Xms=256m -jar xxx.jar

 

2,需要开启监听的二个随机端口,如果这两个端口访问不了,会提示 

VisualVM 无法使用 service:jmx:rmi:///jndi/rmi:///jmxrmi 连接到XX.xx.xx.xx 

 

如何查看,如下命令

netstat -tupln |grep {要监听java进程id}

3,命令查看内存占用情况分布

jstat -gcutil xxx 200 3

 

 

springcloud config配置中心 Cannot clone or checkout repository: git@git.example.com/xxx/xxx.git

出现此问题的原因有可能是

  1,私钥,公钥问题

比如,我遇到的就是之前私钥是以前生成的OPENSSH 私钥,与现在要求生成RSA 私钥公钥不符

所以,请认准私钥头部的 -----BEGIN RSA PRIVATE KEY-----

 

 2,gitlab仓库,未配置公钥

  可以在  gitlab 右上角,点开头像,settings --左侧栏 ssh-keys 

  把在服务器生成的公钥 id_rsa.pub 如果你另外生成的可能是xxx.pub复制过来

 

3,配置文件私钥配置

  一定要注意格式

  

以上为生产服务器配置使用,

 

另外,生成私钥命令为

ssh-keygen -t rsa -C "你的邮箱" 

 

如果已经存在id_rsa 可以指定另外一个

ssh-keygen -t rsa -C "你的邮箱" -f git

 

springcloud turbine 集群配置

1, 创建程序,修改pom.xml

   

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-turbine</artifactId>
        </dependency>

 

2,启动类

    

@EnableTurbine
@SpringBootApplication
@EnableCircuitBreaker
@EnableDiscoveryClient
@EnableHystrixDashboard
public class ShineHystrixApplication {
     
    
    public static void main(String[] args) {
        SpringApplication.run(ShineHystrixApplication.class, args);
        System.out.print("++++++++++++++++++++++++熔断器中心启动成功!!++++++++++++++++++++++++");

    }
}

 

3,配置文件

  

server:
  port: 8768
spring:
  application:
    name: server-hystrix
  security:
    user:
      name: admin
      password: shine
eureka:
  client:
    service-url:
      defaultZone: http://${spring.security.user.name}:${spring.security.user.password}@localhost:8761/eureka/
    fetch-registry: true
    register-with-eureka: true
turbine:
  cluster-name-expression: new String("default")
  combine-host-port: true
##要监控的服务,各服务以,分隔 app-config: PlaceSerProFegin,ContentSerProFegin,MemberSerProFegin management: endpoints: web: exposure: include: hystrix.stream

 

 

 

 

springcloud eureka 开启密码配置,其他应用注册时,提示403

当其他应用注册eureka时,提示403,如下

failure with status code 403; retrying on another server if available

 

原因:

 springcloud2.0以后,密码校验开启了crf。所以把crf关闭即可。

增加配置类

@EnableWebSecurity
public class WebSecurityConfigurer extends WebSecurityConfigurerAdapter {
    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http.csrf().disable();
        super.configure(http);
    }
}

 

重启应用即可。

springcloud config 属性文件中文乱码 有更新!

问题,springcloud config-server服务配置git 存储文件中有包含中文,读取属性文件以后,出现乱码!!

git仓库中的属性文件名 locale_cn.properties

如下所示

shine.login.test=测试项目
shine.login.abc=终于来个Ok


请求 http://localhost:8769/locale/cn

出现乱码。




解决:

1,加载属性文件类,重写

OriginTrackedPropertiesLoader类。

2,自己定义加载属性文件

我采用,自己定义加载属性文件

具体如下:

  

public class MyPropertiesHandler implements PropertySourceLoader {
    @Override
    public String[] getFileExtensions() {
        return new String[]{"properties", "xml"};
    }

    @Override
    public List<PropertySource<?>> load(String name, Resource resource) throws IOException {
        Map<String, ?> properties = this.loadProperties(resource);
        return properties.isEmpty() ? Collections.emptyList() : Collections.singletonList(new OriginTrackedMapPropertySource(name, properties));
    }

    private Map<String, ?> loadProperties(Resource resource) throws IOException {
        String filename = resource.getFilename();
        return (Map) (filename != null && filename.endsWith(".xml") ? PropertiesLoaderUtils.loadProperties(resource) : loadPro(resource));
    }

    
    private Map<String, ?> loadPro(Resource resource) {
        Map<String, String> proMap = new HashMap<>();
        try {
            InputStream inputStream = resource.getInputStream();
            List<Byte> byteList = new LinkedList<Byte>();
            byte[] readByte = new byte[1024];
            int length;
            while ((length = inputStream.read(readByte)) > 0) {
                for (int i = 0; i < length; i++) {
                    byteList.add(readByte[i]);
                }
            }
            byte[] allBytes = new byte[byteList.size()];
            int index = 0;
            for (Byte soloByte : byteList) {
                allBytes[index] = soloByte;
                index += 1;
            }
            String str = new String(allBytes, "UTF-8");
            String[] strs = StringUtils.splitByWholeSeparator(str, "\n");
            for (String tmp_str : strs) {
                if (StringUtils.isNotBlank(tmp_str)) {
                    String[] tmpstr = StringUtils.splitByWholeSeparator(tmp_str, "=");
                    proMap.put(tmpstr[0], tmpstr[1]);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return proMap;

    }
}

 

同时,增加配置在 src/resources目录下增加文件夹

META-INF/spring.factories

添加一行代码 

org.springframework.boot.env.PropertySourceLoader=com.shine.configserver.MyPropertiesHandler

 

重启应用,再刷新 http://localhost:8769/locale/cn 即可看到正确的

 

 

 

 

 

 

 

 

 


 

 
















springcloud feign 多文件上传 有更新!

网上也有不同的上传,但是对于多文件上传的解决方案比较少,参考了https://blog.csdn.net/qq_32953079/article/details/8163081

1,导入引用 的文件 

   

<dependency>
            <groupId>io.github.openfeign.form</groupId>
            <artifactId>feign-form</artifactId>
        </dependency>
        <dependency>
            <groupId>io.github.openfeign.form</groupId>
            <artifactId>feign-form-spring</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-fileupload</groupId>
            <artifactId>commons-fileupload</artifactId>
        </dependency>

 

2,定义上传接口

 

@FeignClient(value = "PLACESERVICEPROVIDER", configuration = FeignMultipartSupportConfig.class)
public interface FileService {

 @PostMapping(value = "/file/uploadMulti", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResult uploadMulti(@RequestPart(value = "files") MultipartFile[] files) throws ServiceException;
}

@Configuration
public class FeignMultipartSupportConfig {
    @Autowired
    private ObjectFactory<HttpMessageConverters> messageConverters;

    @Bean
    public Encoder feignEncoder() {
        return new SpringMultipartEncoder(new SpringEncoder(messageConverters));
    }
}

public class SpringMultipartEncoder extends FormEncoder {

    public SpringMultipartEncoder() {
        this(new Default());
    }


    /**
     * Constructor with specified delegate encoder.
     *
     * @param delegate delegate encoder, if this encoder couldn't encode object.
     */
    public SpringMultipartEncoder(Encoder delegate) {
        super(delegate);

        MultipartFormContentProcessor processor = (MultipartFormContentProcessor) getContentProcessor(ContentType.MULTIPART);
        processor.addWriter(new SpringSingleMultipartFileWriter());
        processor.addWriter(new SpringManyMultipartFilesWriter());
    }


    @Override
    public void encode(Object object, Type bodyType, RequestTemplate template) throws EncodeException {
        // 单MultipartFile判断
        if (bodyType.equals(MultipartFile.class)) {
            MultipartFile file = (MultipartFile) object;
            Map data = Collections.singletonMap(file.getName(), object);
            super.encode(data, MAP_STRING_WILDCARD, template);
            return;
        } else if (bodyType.equals(MultipartFile[].class)) {
            // MultipartFile数组处理
            MultipartFile[] file = (MultipartFile[]) object;
            if (file != null) {
                Map data = Collections.singletonMap(file.length == 0 ? "" : file[0].getName(), object);
                super.encode(data, MAP_STRING_WILDCARD, template);
                return;
            }
        }
        // 其他类型调用父类默认处理方法
        super.encode(object, bodyType, template);

    }
}


3,服务端实现接口

  

@RestController
public class FileServiceImpl implements FileService {
   @Override
    public BaseResult uploadMulti(MultipartFile[] files) throws ServiceException{
  //do Something
}

}

 

 

 一般默认的上传文件大小,会限制1m大小,可以增加配置

具体设置参见 这篇文章的设置

 

 

 

 

 

springcloud fegin 支持文件上传 有更新!

1,API 工程

   定义上传文件接口

   

@FeignClient(value = "PLACESERVICEPROVIDER", configuration = FileService.MutilSupportConfig.class)
public interface FileService {

    @PostMapping(value = "/file/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResult upload(@RequestPart(value = "file") MultipartFile file) throws ServiceException;

@Configuration
class MutilSupportConfig {

        @Autowired
ObjectFactory<HttpMessageConverters> messageConvertos;

@Bean
public Encoder feignFormEncoder() {
            return new SpringFormEncoder(new SpringEncoder(messageConvertos));
}
    }
}




 

2,服务方工程

  具体实现如下

  

@Service
@RestController
public class FileServiceImpl implements FileService {
    @Override
    public BaseResult upload(MultipartFile file) throws ServiceException {
        log.info("====<<<<进入上传处理接口");
        if (file == null) {
            return BaseResultGenerator.failure("上传文件file不能为空");
        }
        return BaseResultGenerator.success(file.getOriginalFilename());
    }
}

  

 

3,消费方工程

  具体实现如下

 

@Slf4j
@RestController
public class FileController {

    @Autowired
    FileService fileService;

    @PostMapping(value = "/file/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResult upload(MultipartFile file) throws ServiceException {
        log.debug("=============>进入上传接口{}", file.getName());
        return fileService.upload(file);
    }
}



 

4. 要引入的包

 <dependency>
                <groupId>io.github.openfeign.form</groupId>
                <artifactId>feign-form-spring</artifactId>
                <version>3.2.2</version>
            </dependency>
            <dependency>
                <groupId>io.github.openfeign.form</groupId>
                <artifactId>feign-form</artifactId>
                <version>3.2.2</version>
            </dependency>

 

 注意,对于上传大文件,超过1M时,会提示文件过大。

在配置文件中需要设置

spring.servlet.multipart.enabled=true

spring.servlet.multipart.max-file-size=10MB

spring.servlet.multipart.max-request-size=10MB

 

原来1.5.9配置为

spring.http.multipart.enabled=true

spring.http.multipart.max-file-size=10MB

spring.http.multipart.max-request-size=10MB

 

在使用Fegin上传时,不光要处理zuul的文件大小,文件超时问题,还要注意也要修改服务方的文件大小以及超时。


        

springcloud Feign @RequestBody 有更新!

1,API工程接口定义

   

@FeginClient(vlaue="memberprovider",fallback=LoginServiceFeginHystrix.class)
public interface LoginService{

  /***
     * 注册新用户
     * */
    @PostMapping(value = "/login/reg")
    public MemberDTO regMember(@RequestBody MemberVo memberVo) throws BusinessException, ServiceException;

}
    熔断服务

  @Slf4j

@Component
public class LoginServiceFeginHystrix implements LoginService {

   @Override
    public MemberDTO regMember(MemberVo memberVo) throws BusinessException, ServiceException {
        log.error("====<<<<进入熔断服务..{}", "LoginServiceFeginHystrix.regMember()");
        return null;
    }
}

}

  

2, 服务端工程实现

   

 @Override
 public MemberDTO regMember(@RequestBody MemberVo memberVo) throws BusinessException, ServiceException {
 //do somethings
}

 

3,消费端调用

 

 @PostMapping("/login/reg")
    public BaseResult reg(MemberVo memberVo) throws BusinessException, ServiceException {
 MemberDTO memberDTO = loginService.regMember(memberVo);
......
}

 

要注意要点:

  服务端实现一般使用工具生成,没有带 @RequestBody

 另外一个,注意接口定义的包名,与消费端包扫描的路径,确认消费端是否会把接口定义的路径扫描进去。

如,接口路径 com.xuahua.api.common.xx

 而消费端的启动类在 com.xuahua.consumer.xx 这样就会扫不到,需要手动添加@ComponetScan(basePackages="")

 缺少这个导致请求过来的值为空。

 

 

 

layui table表格数据刷新几点注意 有更新!

layui 表格

数据表格刷新,需要注意的几点:

1,在页面初始化时,事件方法以及按钮的事件绑定。

 

2. 页面渲染时的组件使用,默认不使用form的监听事件来提交查询。

 

具体如下:

<div id="placeSearch" class="layui-form layui-form-pane" >

<div class="layui-col-md3">
                <div class="layui-inline">
                    <button class="layui-btn" data-type="reload">提交</button>
                </div>
            </div>
</div>

 

js 代码段

  

$(function(){

      layui.use(['table','form'],function(){

              var form = layui.form,

                 table = layui.table;

               table.render({

                     id:'tableload',

                   xxxxxx

                })

     })



  //定义事件

    active = {

      reload : function(){

           table.reload('tableload', {
  
                   where: {
                         key :value
                      }
                , page: {
                    curr: 1//从当前页码开始
                    }
            });
 

         },//还可以增加其他的方法

 

         }

 //绑定按钮事件
  $("#placeSearch .layui-btn").on('click', function ({
        var type = $(this).data('type');
        active[type] ? active[type].call(this) : '';
    })

})


//其他地方也可以调用,直接使用onclick事件
$("#placeSearch .layui-btn").click();


以上标粗的地方,是要注意的地方

springboot1.5.9升级至 springboot2.0.5

项目技术框架使用了 springboot1.5.9+mybatis+druid数据库连接池+mysql

现在由于一些原因,决定把springboot升级到2.0.5

在升级过程中出现了一些问题,现在记录如下:

1,拦截器的失效,原来的类,攺为使用 WebMvcConfigurer

2,thymeleaf-extras-shiro 版本需要升级到2.0.0

3,原来thymeleaf中的一些写法比如 <texarea />会突然页面解析不完全,需要全部攺为<texarea></texarea>

 

oracle-高水位线-压缩表空间 有更新!

压缩表记录大小

1,查询各用户名下用户的记录

select t.OWNER || '.' ||t.table_name names,t.num_rows from dba_tables t where t.OWNER in ('ORD','PROD','TICKET','MEM','ADMINCAS','MIDDLE','CHART') AND t.NUM_ROWS is not null order by t.num_rows desc;

 

2,压缩,不要忘记要重建索引

ALTER TABLE  owner.tableName MOVE ; 

 

 

3,统计分析表

analyze table  owner.tableName compute statistics;

 

如果提示object statistics are locked 

先使用如下命令解锁

exec dbms_stats.unlock_table_stats('OWNER','tableName');

或者

begin

dbms_stats.unlock_table_stats('OWNER','tableName');
end;

然后执行  analyze table  owner.tableName compute statistics;

4,查询索引是否需要重建

select 'alter index '||owner||'.'||index_name ||' rebuild online ; ' command,table_name,tablespace_name,index_type,status from dba_indexes where table_name ='tableName';

5,重建索引

alter index OWNER.INDEXNAME rebuild online

 

6,效果检验 -- 水位线查询


select TABLE_NAME,TABLESPACE_NAME,BLOCKS,EMPTY_BLOCKS from DBA_tables where table_name='T_QUERY_ORDERCOUNT';

500 OOPS: could not bind listening IPv4 socket

在启动vsftpd 过程 出现了以上错误,

配置文件如下:

# 禁止匿名用户anonymous登录
anonymous_enable=NO
# # 允许本地用户登录
local_enable=YES
# # 让登录的用户有写权限(上传,删除)
write_enable=YES
# # 默认umask
local_umask=022
data_connection_timeout=120
# # 把传输记录的日志保存到/var/log/vsftpd.log
#xferlog_enable=YES
xferlog_file=/data/log/vsftpd.log
xferlog_std_format=NO
# # 允许ASCII模式上传
#ascii_upload_enable=YES 
# # 允许ASCII模式下载

#ascii_download_enable=YES
# # 使用20号端口传输数据
connect_from_port_20=YES
# # 欢迎标语
ftpd_banner=Welcome to ftp server.
# # 接下来的三条配置很重要
# # chroot_local_user设置了YES,那么所有的用户默认将被chroot,
# # 也就用户目录被限制在了自己的home下,无法向上改变目录。
#chroot_list_enable=YES 
#设置了YES,即让chroot用户列表有效。
# # ★超重要:如果chroot_local_user设置了YES,那么chroot_list_file
# # 设置的文件里,是不被chroot的用户(可以向上改变目录)
# # ★超重要:如果chroot_local_user设置了NO,那么chroot_list_file
# # 设置的文件里,是被chroot的用户(无法向上改变目录)
chroot_list_enable=YES
# # touch /etc/vsftpd/chroot_list 新建
chroot_list_file=/etc/vsftpd/chroot_list_file
use_localtime=YES
# # 以standalone模式在ipv4上运行
listen=YES
listen_port=21
pasv_max_port=2100
pasv_min_port=2100
allow_writeable_chroot=YES

个人解决如下:

  1,是否端口占用。。命令查看 netstat -tulpn 

  netstat 命令结果图片

 

仔细看画红框的地方,就会发现端口21实际已经占用了。

接下来,直接kill -9 2127.

重启 /usr/local/sbin/vsftpd /etc/vsftpd/vsftpd.conf &

问题解决!

eclipse提速小技巧

1,工程目录右边,viewmenu 小图标处,点开。

如下图viewmenu菜单图标

 

 

2,点击“Filters and Customization..."

 

3,弹出的窗口点,选中”Content" 标签页

 

content

 

把红框勾选的,拿掉,你重启后,再试下是不是快了很多。。

 

 

最简单的RPC框架实现

1,接口定义

public interface EchoService{
    String echo(String ping);
}

 

2,接口实现

public Class EchoServiceImpl implements EchoService{

   @Override
   public String echo(String ping){
        return ping != null?ping + " -- > I am OK .":"I am Ok";
   }
}

 

3,RPC服务端服务发布者代码实现

public class RpcExporter{
   static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

   public static void exporter(String hostname,int port) throws Exception {
      ServerSocket server = new ServerSocket();
      server.bind(new InetSocketAddress(hostName,port));
      try{
         while(true){
           executor.execute(new ExporterTask(server.accept()));
         }
      }finally{
         server.close();
       }
   }
}


private static class ExporterTask implements Runnable{

   Socket client = null;
   public ExporterTask(Socket client){
     this.client = client;
   }
  @Override
  public ExporterTask(Socket client){
     this.client = client;
  }

  @Override
   public void run(){
     
			ObjectInputStream input = null;
			ObjectOutputStream output = null;
			
			try{
				input = new ObjectInputStream(client.getInputStream());
				String interfaceName = input.readUTF();
				Class<?> service = Class.forName(interfaceName);
				String methodName = input.readUTF();
				Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
				Object[] argments = (Object[]) input.readObject();
				Method method = service.getMethod(methodName, parameterTypes);
				Object result = method.invoke(service.newInstance(),argments);
				output = new ObjectOutputStream(client.getOutputStream());
				output.writeObject(result);
				
			}catch(Exception e){
				e.printStackTrace();
			}finally{
				if(output !=null){
					try {
						output.close();
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				
				if(input !=null){
					try {
						input.close();
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				
				if(client!=null){
					try {
						client.close();
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		
  }
}

 

4,本地代理服务 RpcImporter

public class RpcImporter<S> {

	public S importer(final Class<?> serviceClass,final InetSocketAddress addr){
		return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[] {serviceClass.getInterfaces()[0]}, new InvocationHandler(){

			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				
				Socket socket = null;
				ObjectOutputStream output = null;
				ObjectInputStream input = null;
				try{
					socket = new Socket();
					socket.connect(addr);
					output = new ObjectOutputStream(socket.getOutputStream());
					output.writeUTF(serviceClass.getName());
					output.writeUTF(method.getName());
					output.writeObject(method.getParameterTypes());
					output.writeObject(args);
					input = new ObjectInputStream(socket.getInputStream());
					return input.readObject();
				}finally{
					if(socket != null){
						socket.close();
					}
					
					if(output != null){
						output.close();
					}
					
					if(input != null ){
						input.close();
					}
				}
			}
			
		});
	}
}

 

5, 测试代码 RpcTest

 

public class RpcTest {

	public static void main(String[] args) {
		new Thread(new Runnable(){

			@Override
			public void run() {
				try{
					RpcExporter.exporter("localhost",8080);
				}catch(Exception e){
					e.printStackTrace();
				}finally{
					
				}
			}
			
		}).start();
		
		RpcImporter<EchoService> importer = new RpcImporter<EchoService>();
		EchoService echo = importer.importer(EchoServiceImpl.class,new InetSocketAddress("localhost",8080));
		System.out.println(echo.echo("Are you ok? "));
	}
}

 

运行结果.

Are you ok?  -- > I am Ok !

java基础篇-字符串隐藏1 有更新!

一,字符串隐藏替换

     1,需求

         长度为二位以下者,还回原串
         长度为二位以上到四位者,保留前前一位与最后一位中间以***替换
         长度为四位以上者到八位者,保留前二位与最最后二位,中间以***替换
         长度为八位以上者,保留以前三位与后三位,中间以***替换。

     2,适用场景

         针对用户名,姓名等敏感信息,做隐藏处理。

    3,代码实现 

   

/**
	 * 长度为二位以下者,还回原串
	 * 长度为二位以上到四位者,保留前前一位与最后一位中间以***替换
	 * 长度为四位以上者到八位者,保留前二位与最最后二位,中间以***替换
	 * 长度为八位以上者,保留以前三位与后三位,中间以***替换。
	 * @param number
	 * @return
	 */
	public static String formatNumber(String number){
		if(StringUtils.isBlank(number)) return "";
		if(number.length()<=2) return number;
		if(number.length()<=4) return number.substring(0,1)+"***"+number.substring(number.length()-1);
		if(number.length()<8) return number.substring(0,2)+"***"+number.substring(number.length()-2);
		return number.substring(0,3)+"***"+number.substring(number.length()-3);
	}

java基础篇-字符串隐藏2 有更新!

1,需求

   

* 名称格式化,名称隐藏 <s:property
* value="@com.ecp.common.helper.StringUtil@formatStr(name)"/>
*
* @param name
* @param myname
* 没有传null
* @return 若:中文名称 (name为空:*** name为一位:name+*** name为两位或者以下:张***
* name为两位以上:张***三 )
*
* 若:英文名称 (name为空:*** name为一位:name+***
* name为四位或者以下:长度减一+***+最后一位 name为四位以上:前四位+***+最后一位 )
*/

2,代码

public static String formatStr(String name, String myname) {

		if (StringUtils.isBlank(name)) {
			return "***";
		}
		if (name.equals(myname)) {
			return name;
		}
		if (name.length() < 2) {
			return name + "***";
		}
		String lastname  = name.substring(name.length()-1);
		String regEx = "[\u4e00-\u9fa5]";
		Matcher matcher = Pattern.compile(regEx).matcher(name);
		if (matcher.find())
			name = name.length() <= 2 ? name.substring(0, name.length() - 1)
					: name.substring(0, 2);
		else
			name = name.length() <= 4 ? name.substring(0, name.length() - 1)
					: name.substring(0, 4);
		return name + "***"+lastname;
	}

公告

喧哗博客--繁华过后的沉寂--技术经验分享^-^
Copyright (c) 2009-2019, b3log.org & hacpai.com