javaIO和netty的基本原理

This commit is contained in:
法然
2022-11-27 14:45:19 +08:00
parent f90e4ed166
commit 19d3b3e3e2
169 changed files with 5444 additions and 3691 deletions

View File

@@ -0,0 +1,347 @@
# WebFlux
> https://blog.csdn.net/crazymakercircle/article/details/124120506
> https://zhuanlan.zhihu.com/p/378136040
## 1 WebFlux简介
> 第一轮:就是看视频教程学会所有技术的原理和基本使用方法
> 第二轮阅读官方的文档尤其是Spring、Java、Maven等掌握编程的细节。
### 简介
Spring5添加的新模块。用于web开发的功能与SpringMVC类似。Webflux使用与当前比较流行的响应式编程出现的框架。
传统的Web框架比如SpringMVC基于Servlet容器Webflux是一种异步非阻塞的框架。异步非阻塞的框架在Servlet3.1后才支持)
其和虚拟式基于Reactor的相关API实现的。
### 异步非阻塞
我更喜欢反过来理解这里。同步异步是针对被调用者的,阻塞和非阻塞是针对调用者的。
* **阻塞和非阻塞针对调用者**。阻塞和非阻塞指的是调用者(程序)在等待返回结果(或输入)时的状态。阻塞时,在调用结果返回前,当前线程会被挂起,并在得到结果之后返回。非阻塞时,如果不能立刻得到结果,则该调用者不会阻塞当前线程。因此对应非阻塞的情况,调用者需要定时轮询查看处理状态。
* **异步和同步是针对被调用者**,同步: 同步就是发起一个调用后,被调用者未处理完请求之前,调用不返回。异步: 异步就是发起一个调用后,立刻得到被调用者的回应表示已接收到请求,但是被调用者并没有返回结果,此时我们可以处理其他的请求,被调用者通常依靠事件,回调等机制来通知调用者其返回结果。
### 与SpringMVC对比
![](image/2022-10-12-19-28-38.png)
* 异步非阻塞,在有限的资源下,能够处理更多的请求,提高系统地吞吐量。
* 函数式编程。Java最基本的编程模式。能够使用Java函数式编程的特点。
* 两个框架都可以使用注解方式运行都可以运行在Tomcat等Servlet容器中。但SpringMVC采用命令式编程WebFlux使用响应式编程。
### 使用场景:网关
* 需要处理大量的请求。所有客户调用网关,网关负责调用其他的组件。可以使用异步的方式。
## 2 响应式编程
### 响应式编程定义
响应式编程是一种面向数据流和变化产波的编程范式。
意味着可以在编程语言很方便地表达静态或者动态的数据流,
![](image/2022-10-12-19-35-03.png)
一个响应式编程的典型例子。D1=B1+C1。当B1的值修改后D1的值也会修改。B1的数据变化流向了D1。
其基本的模型如下
* 可观察对象、观察者。(观察者模式)
* 发布者、订阅者。(发布订阅机制)
* 事件、响应。(事件驱动、事件监听机制、响应式编程)
### Java8响应式编程
是要使用观察者模式实现了响应式编程。使用响应式编程Observer,Observable实现。
```java
/**
* Alipay.com Inc.
* Copyright (c) 2004-2022 All Rights Reserved.
*/
package com.ykl.shangguigu08.reactor;
import java.util.Observable;
/**
* @author yinkanglong
* @version : ObserverDemo, v 0.1 2022-10-12 19:47 yinkanglong Exp $
*/
public class ObserverDemo extends Observable {
/**
* 通过Java8中的类实现响应式编程。
* 简单来说,就是观察值模式。
* @param args
*/
@Test
public void testObserver() {
//其中observerDemo是一个可观察对象Observer添加了匿名的观察者。
//二者通过addObserver建立联系。
//observable对象通过setChanged和notifyObservers发送通知、事件。进行相应。
ObserverDemo observerDemo = new ObserverDemo();
observerDemo.addObserver((o,arg)->{
System.out.println("发生变化");
});
observerDemo.addObserver((o,arg)->{
System.out.println("准备改变");
});
observerDemo.addObserver(new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(o);
System.out.println(arg);
}
});
observerDemo.setA(11);
observerDemo.setChanged();
System.out.println(observerDemo.hasChanged());
observerDemo.notifyObservers();
observerDemo.notifyObservers("hello world");
}
}
```
### java9响应式编程
主要通过Flow类的sub和sub订阅消息实现响应式编程。
> 感觉这个响应式编程和awt控件的点击相应式操作很相似。但是不是启动新的线程。
```java
Observable<Integer> observable=Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0;i<5;i++){
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
//Observable.subscribe(Observer)Observer订阅了Observable
Subscription subscribe = observable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "完成");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "异常");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "接收Obsverable中发射的值" + integer);
}
});
输出
接收Obsverable中发射的值0
接收Obsverable中发射的值1
接收Obsverable中发射的值2
接收Obsverable中发射的值3
接收Obsverable中发射的值4
```
### Flux&Mono响应式编程
* 响应式编程操作Reactor是满足Reactive规范框架
* Reactor有两个核心类Mono和Flux这两个类实现接口Publisher提供丰富操作符号.
* Flux对象实现发布返回N个元素。
* Mono实现发布者返回0或者1个元素。把 Mono 用于在异步任务完成时发出通知。
* Flux和Mono都是数据流的发布者。能够发出三种信号
* 元素值
* 完成信号。一种终止信号。订阅者数据流已经结束了。
* 错误信号。一种终止信号。终止数据流并把错误信息传递给订阅者。
![](image/2022-10-13-10-22-26.png)
三种信号的特点
* 错误信号和完成信号都是终止信号不能共存。
* 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示空数据流
* 如果没有错误信号,没有完成信号,表示无限数据流。
### 实例Flux&Mono
引入相关的依赖
```xml
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.5.RELEASE</version>
</dependency>
```
进行发布者发布内容
* just等发布方法只是声明了数据流。只有声明了订阅者才会触发数据流不订阅就不会触发。
```java
package com.ykl.shangguigu08.reactor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
/**
* @author yinkanglong
* @version : TestReactor, v 0.1 2022-10-13 10:25 yinkanglong Exp $
*/
public class TestReactor {
@Test
public void testFlux(){
//其中Flux/Mono就是一个可观察对象/发布者。Consumer是一个观察者/消费者。
//二者通过subscriber建立联系。
//直接发送通知。不需要调用特定的函数。
Flux<Integer> justInteger = Flux.just(1, 2, 3, 4);
justInteger.subscribe(System.out::println);
Integer[] array = {1,2,3,4};
Flux.fromArray(array).subscribe(System.out::println);
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list).subscribe(System.out::println);
Stream<Integer> stream = list.stream();
Flux.fromStream(stream).subscribe(System.out::println);
Flux.just("hello","world").subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
}
}
```
```java
public Mono<ClientUser> currentUser () {
return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
: Mono.empty();
}
```
## 4 WebFlux执行流程和核心API
### Netty的基本原理
SpringWebflux基于Reactor默认使用容器NettyNetty是高性能的NIO框架异步非阻塞框架。
1. BIO阻塞
![](image/2022-10-13-11-44-28.png)
2. NIO非阻塞
![](image/2022-10-13-11-44-49.png)
3. webflux
![](image/2022-11-25-20-26-34.png)
### SpringWebFlux
* SpringWebflux核心控制器DispatchHandler实现接口WebHandler
![](image/2022-10-13-11-48-44.png)
DispatcherHandler负责请求处理。有三个核心类。
![](image/2022-10-13-11-51-20.png)
* HandlerMappingreactor反应器请求查询到处理方法。
* HandlerAdapter真正负责请求处理processor部分
* HandlerResultHandler对结果进行处理
### 函数式编程
两个核心接口。
* RouterFunction 路由处理
* HandlerFunction处理函数
常用函数编程示例
* Consumer 一个输入 无输出
```java
Product product=new Product();
//类名+静态方法 一个输入T 没有输出
Consumer consumer1 = Product->Product.nameOf(product);//lambda
consumer1.accept(product);
Consumer consumer = Product::nameOf;//方法引用
consumer.accept(product);
```
* Funtion<T,R> 一个输入 一个输出
```java
//对象+方法 一个输入T 一个输出R
Function<Integer, Integer> function = product::reduceStock;
System.out.println("剩余库存:" + function.apply(10));
//带参数的构造函数
Function<Integer,Product> function1=Product::new;
System.out.println("新对象:" +function1.apply(200));
```
* Predicate 一个输入T, 一个输出 Boolean
```java
//Predicate 一个输入T 一个输出Boolean
Predicate predicate= i -> product.isEnough(i);//lambda
System.out.println("库存是否足够:"+predicate.test(100));
Predicate predicate1= product::isEnough;//方法引用
System.out.println("库存是否足够:"+predicate1.test(100));
```
* UnaryOperator 一元操作符 输入输出都是T
```java
//一元操作符 输入和输出T
UnaryOperator integerUnaryOperator =product::reduceStock;
System.out.println("剩余库存:" + integerUnaryOperator.apply(20));
IntUnaryOperator intUnaryOperator = product::reduceStock;
System.out.println("剩余库存:" + intUnaryOperator.applyAsInt(30));
```
* Supplier 没有输入 只有输出
```java
//无参数构造函数
Supplier supplier = Product::new;
System.out.println("创建新对象:" + supplier.get());
Supplier supplier1=()->product.getStock();
System.out.println("剩余库存:" + supplier1.get());
```
* BiFunction 二元操作符 两个输入<T,U> 一个输出
```java
//类名+方法
BiFunction<Product, Integer, Integer> binaryOperator = Product::reduceStock;
System.out.println(" 剩余库存(BiFunction)" + binaryOperator.apply(product, 10));
```
* BinaryOperator 二元操作符 ,二个输入 一个输出
```java
//BinaryOperator binaryOperator1=(x,y)->product.reduceStock(x,y);
BinaryOperator binaryOperator1=product::reduceStock;
System.out.println(" 剩余库存(BinaryOperator)" +binaryOperator1.apply(product.getStock(),10));
```
### 流式编程
1. 是Java新支持的一种变成方法与面向对象编程、函数式编程等类似流式编程以来流式编程的API是一种编程的模式本身并不影响代码的功能。也算是语法糖的一种。
2. Flux和Mono只是采用了流式编程思想。不是一种特殊的编程思想。
```java
public Stream<ClientUser> allUsers() {
return Stream.of(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
```
对数据流进行一道道操作,成为操作符,比如工厂流水线。
* 操作符map。将元素映射为新的元素。
* 操作符flatmap。元素映射为流。
![](image/2022-10-13-10-38-26.png)
![](image/2022-10-13-10-41-45.png)

View File

@@ -0,0 +1,314 @@
## 1 Flux类中的静态方法
### 简单的创建方法
**just()**
可以指定序列中包含的全部元素。创建出来的Flux序列在发布这些元素之后会自动结束
**fromArray()fromIterable()fromStream()**
可以从一个数组Iterable对象或Stream对象中穿件Flux对象
**empty()**
创建一个不包含任何元素,只发布结束消息的序列
**error(Throwable error)**
创建一个只包含错误消息的序列
**never()**
传建一个不包含任务消息通知的序列
**range(int start, int count)**
创建包含从start起始的count个数量的Integer对象的序列
**interval(Duration period)和interval(Duration delay, Duration period)**
创建一个包含了从0开始递增的Long对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外还可以指定起始元素发布之前的延迟时间
**intervalMillis(long period)和intervalMillis(long delay, long period)**
与interval()方法相同,但该方法通过毫秒数来指定时间间隔和延迟时间
例子
```
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[]{1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscirbe(System.out::println);
```
### 复杂的序列创建 generate()
当序列的生成需要复杂的逻辑时则应该使用generate()或create()方法。
* generate()方法通过同步和逐一的方式来产生Flux序列。
* 序列的产生是通过调用所提供的的SynchronousSink对象的next()complete()和error(Throwable)方法来完成的。
* 逐一生成的含义是在具体的生成逻辑中next()方法只能最多被调用一次。
* 在某些情况下,序列的生成可能是有状态的,需要用到某些状态对象,此时可以使用
```
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
```
其中stateSupplier用来提供初始的状态对象。
在进行序列生成时状态对象会作为generator使用的第一个参数传入可以在对应的逻辑中对改状态对象进行修改以供下一次生成时使用。
```java
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if( list.size() ==10 )
sink.complete();
return list;
}).subscribe(System.out::println);
```
### 复杂的序列创建 create()
create()方法与generate()方法的不同之处在于所使用的是FluxSink对象。
FluxSink支持同步和异步的消息产生并且可以在一次调用中产生多个元素。
``
Flux.create(sink -> {
for(int i = 0; i < 10; i ++)
sink.next(i);
sink.complete();
}).subscribe(System.out::println);
```
## 2 Mono静态方法
Mono类包含了与Flux类中相同的静态方法just()empty()和never()等。
除此之外Mono还有一些独有的静态方法
* fromCallable()fromCompletionStage()fromFuture()fromRunnable()和fromSupplier()分别从CallableCompletionStageCompletableFutureRunnable和Supplier中创建Mono
* delay(Duration duration)和delayMillis(long duration)创建一个Mono序列在指定的延迟时间之后产生数字0作为唯一值
* ignoreElements(Publisher source)创建一个Mono序列忽略作为源的Publisher中的所有元素只产生消息
* justOrEmpty(Optional<? extends T> data)和justOrEmpty(T data)从一个Optional对象或可能为null的对象中创建Mono。只有Optional对象中包含之或对象不为null时Mono序列才产生对应的元素
```java
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribte(System.out::println);
```
## 3 操作符
### 操作符buffer和bufferTimeout
这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。
在进行收集时可以指定不同的条件所包含的元素的最大数量或收集的时间间隔。方法buffer()仅使用一个条件而bufferTimeout()可以同时指定两个条件。
指定时间间隔时可以使用Duration对象或毫秒数即使用bufferMillis()或bufferTimeoutMillis()两个方法。
除了元素数量和时间间隔外还可以通过bufferUntil和bufferWhile操作符来进行收集。这两个操作符的参数时表示每个集合中的元素索要满足的条件的Predicate对象。
bufferUntil会一直收集直到Predicate返回true。
使得Predicate返回true的那个元素可以选择添加到当前集合或下一个集合中bufferWhile则只有当Predicate返回true时才会收集。一旦为false会立即开始下一次收集。
```java
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i%2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i%2 == 0).subscribe(System.out::println);
```
### 操作符Filter
对流中包含的元素进行过滤只留下满足Predicate指定条件的元素。
```
Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);
```
### 操作符zipWith
zipWith操作符把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理由此得到的是一个元素类型为Tuple2的流也可以通过一个BiFunction函数对合并的元素进行处理所得到的流的元素类型为该函数的返回值。
```java
Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println);
```
### 操作符take
take系列操作符用来从当前流中提取元素。提取方式如下
* take(long n)take(Duration timespan)和takeMillis(long timespan):按照指定的数量或时间间隔来提取
* takeLast(long n)提取流中的最后N个元素
* takeUntil(Predicate<? super T> predicate) 提取元素直到Predicate返回true
* takeWhile(Predicate<? super T> continuePredicate)当Predicate返回true时才进行提取
* takeUntilOther(Publisher<?> other):提取元素知道另外一个流开始产生元素
```java
Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
```
### 操作符reduce和reduceWith
reduce和reduceWith操作符对流中包含的所有元素进行累计操作得到一个包含计算结果的Mono序列。累计操作是通过一个BiFunction来表示的。在操作时可以指定一个初始值。若没有初始值则序列的第一个元素作为初始值。
```
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x + y) -> x + y).subscribe(System.out::println);
```
### 操作符merge和mergeSequential
merge和mergeSequential操作符用来把多个流合并成一个Flux序列。merge按照所有流中元素的实际产生序列来合并而mergeSequential按照所有流被订阅的顺序以流为单位进行合并。
```java
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
```
### 操作符flatMap和flatMapSequential
flatMap和flatMapSequential操作符把流中的每个元素转换成一个流再把所有流中的元素进行合并。flatMapSequential和flatMap之间的区别与mergeSequential和merge是一样的。
```java
Flux.just(5, 10).flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x)).toStream().forEach(System.out::println);
```
### 操作符concatMap
concatMap操作符的作用也是把流中的每个元素转换成一个流再把所有流进行合并。concatMap会根据原始流中的元素顺序依次把转换之后的流进行合并并且concatMap堆转换之后的流的订阅是动态进行的而flatMapSequential在合并之前就已经订阅了所有的流。
```
Flux.just(5, 10).concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x)).toStream().forEach(System.out::println);
```
### 操作符combineLatest
combineLatest操作符把所有流中的最新产生的元素合并成一个新的元素作为返回结果流中的元素。只要其中任何一个流中产生了新的元素合并操作就会被执行一次结果流中就会产生新的元素。
```
Flux.combineLatest(Arrays::toString, Flux.intervalMillis(100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
```
## 4 消息处理
当需要处理Flux或Mono中的消息时可以通过subscribe方法来添加相应的订阅逻辑。
* 在调用subscribe方法时可以指定需要处理的消息类型。
```java
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).subscribe(System.out::println, System.err::println);
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).onErrorReturn(0).subscribe(System.out::println);
```
* 第2种可以通过switchOnError()方法来使用另外的流来产生元素。
```java
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).switchOnError(Mono.just(0)).subscribe(System.out::println);
```
* 第三种是通过onErrorResumeWith()方法来根据不同的异常类型来选择要使用的产生元素的流。
```java
Flux.just(1, 2).concatWith(Mono.error(new IllegalArgumentException())).onErrorResumeWith(e -> {
if(e instanceof IllegalStateException)
return Mono.just(0);
else if(e instanceof IllegalArgumentException)
return Mono.just(-1);
return Mono.epmty();
}).subscribe(System,.out::println);
```
* 当出现错误时还可以使用retry操作符来进行重试。重试的动作是通过重新订阅序列来实现的。在使用retry操作时还可以指定重试的次数。
```java
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).retry(1).subscrible(System.out::println);
```
## 5 调度器Scheduler
通过调度器可以指定操作执行的方式和所在的线程。有以下几种不同的调度器实现
* 当前线程通过Schedulers.immediate()方法来创建
* 单一的可复用的线程通过Schedulers.single()方法来创建
* 弹性的线程池通过Schedulers.elastic()方法来创建。线程池中的线程是可以复用的。当所需要时新的线程会被创建。若一个线程闲置时间太长则会被销毁。该调度器适用于I/O操作相关的流的处理
* 并行操作优化的线程池通过Schedulers.parallel()方法来创建。其中的线程数量取决于CPU的核的数量。该调度器适用于计算密集型的流的处理
* 使用支持任务调度的调度器通过Schedulers.timer()方法来创建
* 从已有的ExecutorService对象中创建调度器通过Schedulers.fromExecutorService()方法来创建
通过publishOn()和subscribeOn()方法可以切换执行操作调度器。publishOn()方法切换的是操作符的执行方式而subscribeOn()方法切换的是产生流中元素时的执行方式
```java
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
}).publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
```
## 6 测试
StepVerifier的作用是可以对序列中包含的元素进行逐一验证。通过StepVerifier.create()方法对一个流进行包装之后再进行验证。expectNext()方法用来声明测试时所期待的流中的下一个元素的值而verifyComplete()方法则验证流是否正常结束。verifyError()来验证流由于错误而终止。
```
StepVerifier.create(Flux.just(a, b)).expectNext("a").expectNext("b").verifyComplete();
```
使用StepVerifier.withVirtualTime()方法可以创建出使用虚拟时钟的SteoVerifier。通过thenAwait(Duration)方法可以让虚拟时钟前进。
```
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))
 .expectSubscription()
 .expectNoEvent(Duration.ofHours(4))
 .expectNext(0L)
 .thenAwait(Duration.ofDays(1))
 .expectNext(1L)
 .verifyComplete();
```
TestPublisher的作用在于可以控制流中元素的产生甚至是违反反应流规范的情况。通过create()方法创建一个新的TestPublisher对象然后使用next()方法来产生元素使用complete()方法来结束流。
```
final TestPublisher<String> testPublisher = TestPublisher.creater();
testPublisher.next("a");
testPublisher.next("b");
testPublisher.complete();
StepVerifier.create(testPublisher)
.expectNext("a")
.expectNext("b")
.expectComplete();
```
## 7 调试
在调试模式启用之后,所有的操作符在执行时都会保存额外的与执行链相关的信息。当出现错误时,这些信息会被作为异常堆栈信息的一部分输出。
```
Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());
```
也可以通过checkpoint操作符来对特定的流处理链来启用调试模式。
```
Flux.just(1, 0).map(x -> 1/x).checkpoint("test").subscribe(System.out::println);
```
## 8 日志记录
可以通过添加log操作把流相关的事件记录在日志中
```
Flux.range(1, 2).log("Range").subscribe(System.out::println);
```
## 9 冷热序列
冷序列的含义是不论订阅者在何时订阅该序列,总是能收到序列中产生的全部消息。热序列是在持续不断的产生消息,订阅者只能获取到在其订阅之后产生的消息。
```java
final Flux<Long> source = Flux.intervalMillis(1000).take(10).publish.autoConnect();
source.subscribe();
Thread.sleep(5000);
source.toStream().forEach(System.out::println);
```

View File

@@ -0,0 +1,243 @@
## 1 WebFlux基于注解的编程的实现
### 创建WebFlux项目
1. 创建Springboot项目引入webflux的依赖
```xml
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>shangguigu09</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>shangguigu09</name>
<description>shangguigu09</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
```
2. 在配置文件中设置启动端口号8081
```yaml
server.port =8081
```
3. 从上到下设计代码:创建接口和实现类
```java
@Service
public class UserServiceImpl implements UserService {
private final Map<Integer,User> users = new HashMap<>();
public UserServiceImpl() {
this.users.put(1,new User("lucy","nan",10));
this.users.put(2,new User("mary","nv",38));
this.users.put(3,new User("jack","nv",32));
}
@Override
public Mono<User> getUserById(int id) {
return Mono.justOrEmpty(this.users.get(id));
}
@Override
public Flux<User> getAllUser() {
return Flux.fromIterable(this.users.values());
}
@Override
public Mono<Void> savaUserInfo(Mono<User> userMono) {
return userMono.doOnNext(person->{
int id = users.size() + 1;
users.put(id,person);
}).thenEmpty(Mono.empty());
}
}
```
4. 从下到上实现代码:实现业务逻辑
```java
@RestController
public class UserController {
@Autowired
private UserService userService;
//id
@GetMapping("/user/{id}")
public Mono<User> getUserById(@PathVariable int id){
return userService.getUserById(id);
}
//all
@GetMapping("/user")
public Flux<User> getAllUser(){
return userService.getAllUser();
}
//tianjian
@GetMapping("/saveuser")
public Mono<Void> saveUser(@RequestBody User user){
Mono<User> userMono = Mono.just(user);
return userService.savaUserInfo(userMono);
}
@GetMapping("/hello/{latency}")
public Mono<String> hello(@PathVariable long latency) {
System.out.println("Start:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
System.out.println("Page count:" + COUNT.incrementAndGet());
Mono<String> res = Mono.just("welcome to Spring Webflux").delayElement(Duration.ofSeconds(latency));//阻塞latency秒模拟处理耗时
System.out.println("End: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
return res;
}
}
```
### 实现说明
* SpringMVC范式同步阻塞方式基于SpringMVC+Servlet+Tomcat
* SpringWebflux方式异步非阻塞方式基于SpringMVCWebflux+Reactor+Netty
## 2 WebFlux基于函数的编程的实现
### 简要说明
> bio,nio,aio
在使用函数式编程,需要自己初始化服务器
基于函数式编程模型的时候,有两个核心接口。
* RouterFunction 实现路由功能请求转发给对应的handler
* HandlerFunction 处理请求生成响应函数。
核心任务定义两个函数式接口的实现,并启动需要的服务器。
SpringWebFlux的请求和响应是
* ServerRequest
* ServerResponse
### 实现流程
1. 从上到下实现业务bean
2. 创建handler实现Mono方法
```java
public class UserHandler {
private final UserService userService;
public UserHandler(UserService userService){
this.userService = userService;
}
//根据id
public Mono<ServerResponse> getUserById(ServerRequest request){
//获取id值
int userid = Integer.valueOf( request.pathVariable("id"));
Mono<ServerResponse> notFound = ServerResponse.notFound().build();
//调用service方法取得数据
Mono<User> userMono = this.userService.getUserById(userid);
//UserMono进行转换返回。Reactor操作符
return userMono.flatMap(person->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(fromObject(person)))
.switchIfEmpty(notFound);
}
//所有用户
public Mono<ServerResponse> getAllUsers(){
Flux<User> users = this.userService.getAllUser();
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users,User.class);
}
//添加
public Mono<ServerResponse> saveUser(ServerRequest request){
Mono<User> userMono = request.bodyToMono(User.class);
return ServerResponse.ok().build(this.userService.savaUserInfo(userMono));
}
}
```
3. 创建并初始化服务器设置路由和handler
```java
public class Server {
//创建路由
public RouterFunction<ServerResponse> route(){
UserService userService = new UserServiceImpl();
UserHandler handler = new UserHandler(userService);
return RouterFunctions.route(GET("/users/{id}").and(accept(MediaType.APPLICATION_JSON)),handler::getUserById);
// .andRoute(GET("users").and(accept(MediaType.APPLICATION_JSON)),handler::getAllUsers)
// .andRoute(GET("saveuser").and(accept(MediaType.APPLICATION_JSON)),handler::saveUser);
}
public void createReactorServer(){
RouterFunction<ServerResponse> route = route();
HttpHandler httpHandler = toHttpHandler(route);
ReactorHttpHandlerAdapter reactorHttpHandlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer httpServer = HttpServer.create();
httpServer.handle(reactorHttpHandlerAdapter).bindNow();
}
public static void main(String[] args) throws Exception{
Server server = new Server();
server.createReactorServer();
System.out.println("enter to exit");
System.in.read();
}
}
```
## 3 WebClient调用
```java
public class Client {
public static void main(String[] args) {
WebClient webClient = WebClient.create("http://127.0.0.1:62418");
User userMono = webClient.get().uri("/users/{id}", "1").accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).block();
System.out.println(userMono.getName());
}
}
```

View File

@@ -0,0 +1,229 @@
> 参考文件https://blog.csdn.net/hellozpc/article/details/122441522
> https://blog.csdn.net/kerongao/article/details/109746190
> https://www.cnblogs.com/chaosmoor/p/1670308e.html
> https://www.jianshu.com/p/cc3a99614476
## 1 概述
### 简介
它是Spring5中引入的响应式web客户端类库最大特点是支持异步调用我们还将学习WebTestClient用于单元测试。
简单地说WebClient是一个接口执行web请求的主要入口点。
它是Spring Web Reactive模块的一部分并且取代经典的RestTemplate而生。此外新的客户端是一个响应式的、非阻塞的技术方案可以在HTTP/1.1协议上工作。
## 2 使用
### 步骤
使用WebClient我们需要按照如下几步来操作
1. 创建WebClient实例
2. 执行请求
3. 处理返回数据
### 创建WebClient实例
构建WebClient有三种方法
第一种使用默认配置构建WebClient
```java
WebClient client1 = WebClient.create();
```
第二种使用base URI参数构建WebClient
```java
WebClient client2 = WebClient.create("http://localhost:8080");
```
第三种使用DefaultWebClientBuilder构造WebClient。构造器流式编程接口创建。
```java
WebClient client3 = WebClient
.builder()
.baseUrl("http://localhost:8080")
.defaultCookie("cookieKey", "cookieValue")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultUriVariables(Collections.singletonMap("url", "http://localhost:8080"))
.build();
```
### WebClient设置Timeouts
通常默认HTTP的超时为30秒对于实际业务来说时间太长所以让我们看看如何为我们的WebClient实例配置它们。
可以通过TcpClient来设置超时时间超时时间分为链接超时时间和读/写超时时间,为了保证达到预期效果这两个值都需要设置。
我们可以通过ChannelOption.CONNECT_TIMEOUT_MILLIS设置连接超时我们还可以使用ReadTimeoutHandler和WriteTimeoutHandler分别设置读和写超时
```java
TcpClient tcpClient = TcpClient
.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.doOnConnected(connection -> {
connection.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS));
connection.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS));
});
WebClient client = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
.build();
```
> 虽然我们也可以在WebClient客户端请求上调用超时但这是一个信号超时而不是HTTP连接或读/写超时;这是Mono/Flux发布者的超时
### 准备请求数据
**第一步**我们需要通过method(HttpMethod method) 或调yonWebClient来指定请求的HTTP方法get, post,delete:
```java
WebClient.UriSpec<WebClient.RequestBodySpec> request1 = client3.method(HttpMethod.POST);
WebClient.UriSpec<WebClient.RequestBodySpec> request2 = client3.post();
```
**第二步**设置请求访问URL
```java
WebClient.RequestBodySpec uri1 = client3
.method(HttpMethod.POST)
.uri("/resource");
WebClient.RequestBodySpec uri2 = client3
.post()
.uri(URI.create("/resource"));
```
路径参数我们使用uriBuilder构建包含路径参数的uri也可以直接使用占位符。 /events/{id} 访问点并构建相应的 URI
```java
this.webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/events/{id}")
.build(2))
.retrieve();
verifyCalledUrl("/events/2");
String url = "http://localhost:8080/user/{id}/{name}";
String id = "123";
String name = "Boss";
Mono<String> mono = WebClient.create()
.method(HttpMethod.POST)
.uri(url, id, name)
.retrieve()
.bodyToMono(String.class);
String result = mono.block();
```
查询参数:采用 /events?name=[name]&startDate=[startDate]访问点。要设置查询参数,我们需要调用 UriBuilder 接口的 queryParam()方法:
```java
this.webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/events")
.queryParam("name", "InitFailed")
.queryParam("startDate", "13/02/2021")
.build())
.retrieve();
verifyCalledUrl("/events?name=InitFailed&startDate=13/02/2021")
```
**第三步**设置request body、content type、 length、cookies、headers 等请求参数
例如如果我们想要设置一个request body有两种可用的方法:用一个BodyInserter或者把这个工作委托给Publisher
```java
WebClient.RequestHeadersSpec requestSpec1 = WebClient
.create()
.method(HttpMethod.POST)
.uri("/resource")
.body(BodyInserters.fromPublisher(Mono.just("data")), String.class);
WebClient.RequestHeadersSpec<?> requestSpec2 = WebClient
.create("http://localhost:8080")
.post()
.uri(URI.create("/resource"))
.body(BodyInserters.fromObject("data"));
```
BodyInserter 是一个接口负责向request body中插入请求的body值
我们还可以设置MultiValueMap数据作为request body值
```java
LinkedMultiValueMap map = new LinkedMultiValueMap();
map.add("key1", "value1");
map.add("key2", "value2");
BodyInserter<MultiValueMap, ClientHttpRequest> inserter2
= BodyInserters.fromMultipartData(map);
```
或者插入一个对象
```java
BodyInserter<Object, ReactiveHttpOutputMessage> inserter3
= BodyInserters.fromObject(new Object());
```
在设置request body之后我们可以设置content type、 length、cookies、headers
此外它还支持最常用的头文件如“If-None-Match”、“If-Modified-Since”、“Accept”和“Accept- charset”。
```java
WebClient.ResponseSpec response1 = uri1
.body(inserter3)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML)
.acceptCharset(Charset.forName("UTF-8"))
.ifNoneMatch("*")
.ifModifiedSince(ZonedDateTime.now())
.retrieve();
```
### 获取Response
最后一个步骤是发送请求和接收响应,这可以通过exchange或retrieve方法来完成
* exchange方法提供了ClientResponse以及它的status和header
* 而retrieve方法是直接获取body的内容:
```java
String response2 = request1.exchange()
.block()
.bodyToMono(String.class)
.block();
String response3 = request2
.retrieve()
.bodyToMono(String.class)
.block();
```
需要注意的是bodyToMono方法如果状态代码是4xx(客户端错误)或5xx(服务器错误)它将抛出一个WebClientException。
* Monos.block()方法来订阅和检索与响应一起发送的实际数据
* Monos.subscribe()非阻塞式获取响应结果
```java
@Test
public void testSubscribe() {
Mono<String> mono = WebClient
.create()
.method(HttpMethod.GET)
.uri("http://localhost:8080/hello")
.retrieve()
.bodyToMono(String.class);
mono.subscribe(WebClientTest::handleMonoResp);
}
//响应回调
private static void handleMonoResp(String monoResp) {
System.out.println("请求结果为:" + monoResp);
}
```
注意在使用新版的spring boot 2.4.0,可能会出现如下错误:
```java
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
```
可以通过修改application.yaml配置文件,增加缓冲区大小(测试发现不起作用)
```java
spring.codec.max-in-memory-size: 5MB
```
通过代码配置缓冲区大小
```java
WebClient client = WebClient.builder()
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> {
configurer.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024) ; }
)
.build())
.build();
```
设置缓冲区大小为16MB

View File

@@ -1,435 +0,0 @@
# WebFlux
> 这一块了解就行,不用掌握。
## 1 WebFlux简介
> 第一轮:就是看视频教程学会所有技术的原理和基本使用方法
> 第二轮阅读官方的文档尤其是Spring、Java、Maven等掌握编程的细节。
### 简介
Spring5添加的新模块。用于web开发的功能与SpringMVC类似。Webflux使用与当前比较流行的响应式编程出现的框架。
传统的Web框架比如SpringMVC基于Servlet容器Webflux是一种异步非阻塞的框架。异步非阻塞的框架在Servlet3.1后才支持)
其和虚拟式基于Reactor的相关API实现的。
### 异步非阻塞
我更喜欢反过来理解这里。同步异步是针对被调用者的,阻塞和非阻塞是针对调用者的。
* **阻塞和非阻塞针对调用者**。阻塞和非阻塞指的是调用者(程序)在等待返回结果(或输入)时的状态。阻塞时,在调用结果返回前,当前线程会被挂起,并在得到结果之后返回。非阻塞时,如果不能立刻得到结果,则该调用者不会阻塞当前线程。因此对应非阻塞的情况,调用者需要定时轮询查看处理状态。阻塞: 阻塞就是发起一个请求,调用者一直等待请求结果返回,也就是当前线程会被挂起,无法从事其他任务,只有当条件就绪才能继续。非阻塞: 非阻塞就是发起一个请求,调用者不用一直等着结果返回,可以先去干其他事情。。
* **异步和同步是针对被调用者**,同步: 同步就是发起一个调用后,被调用者未处理完请求之前,调用不返回。异步: 异步就是发起一个调用后,立刻得到被调用者的回应表示已接收到请求,但是被调用者并没有返回结果,此时我们可以处理其他的请求,被调用者通常依靠事件,回调等机制来通知调用者其返回结果。
### 与SpringMVC对比
![](image/2022-10-12-19-28-38.png)
* 异步非阻塞,在有限的资源下,能够处理更多的请求,提高系统地吞吐量。
* 函数式编程。Java最基本的编程模式。能够使用Java函数式编程的特点。
* 两个框架都可以使用注解方式运行都可以运行在Tomcat等Servlet容器中。但SpringMVC采用命令式编程WebFlux使用响应式编程。
### 使用场景:网关
* 需要处理大量的请求。所有客户调用网关,网关负责调用其他的组件。可以使用异步的方式。
## 2 响应式编程
### 响应式编程定义
响应式编程是一种面向数据流和变化产波的编程范式。
意味着可以在编程语言很方便地表达静态或者动态的数据流,
![](image/2022-10-12-19-35-03.png)
一个响应式编程的典型例子。D1=B1+C1。当B1的值修改后D1的值也会修改。B1的数据变化流向了D1。
### Java8响应式编程
是要使用观察者模式实现了响应式编程。使用响应式编程Observer,Observable实现。
```java
/**
* Alipay.com Inc.
* Copyright (c) 2004-2022 All Rights Reserved.
*/
package com.ykl.shangguigu08.reactor;
import java.util.Observable;
/**
* @author yinkanglong
* @version : ObserverDemo, v 0.1 2022-10-12 19:47 yinkanglong Exp $
*/
public class ObserverDemo extends Observable {
/**
* 通过Java8中的类实现响应式编程。
* 简单来说,就是观察值模式。
* @param args
*/
public static void main(String[] args) {
ObserverDemo observerDemo = new ObserverDemo();
observerDemo.addObserver((o,arg)->{
System.out.println("发生变化");
});
observerDemo.addObserver((o,arg)->{
System.out.println("准备改变");
});
observerDemo.setChanged();
observerDemo.notifyObservers();
}
}
```
### java9响应式编程
主要通过Flow类的sub和sub订阅消息实现响应式编程。
> 感觉这个响应式编程和awt控件的点击相应式操作很相似。但是不是启动新的线程。
```
```
### 响应式编程Reator实现
* 响应式编程操作Reactor是满足Reactive规范框架
* Reactor有两个核心类Mono和Flux这两个类实现接口Publisher提供丰富操作符号Flux对象实现发布返回N个元素。Mono实现发布者返回0或者1个元素。
* Flux和Mono都是数据流的发布者。能够发出三种信号
* 元素值
* 完成信号。一种终止信号。订阅者数据流已经结束了。
* 错误信号。一种终止信号。终止数据流并把错误信息传递给订阅者。
![](image/2022-10-13-10-22-26.png)
三种信号的特点
* 错误信号和完成信号都是终止信号不能共存。
* 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示空数据流
* 如果没有错误信号,没有完成信号,表示无限数据流。
### 实例Flux&Mono
引入相关的依赖
```
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.5.RELEASE</version>
</dependency>
```
进行发布者发布内容
```java
package com.ykl.shangguigu08.reactor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
/**
* @author yinkanglong
* @version : TestReactor, v 0.1 2022-10-13 10:25 yinkanglong Exp $
*/
public class TestReactor {
public static void main(String[] args) {
//reactor中的核心语法
Flux.just(1,2,3,4);
Mono.just(1);
//其他方法
Integer[] array = {1,2,3,4};
Flux.fromArray(array);
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);
}
}
```
* just等发布方法只是声明了数据流。只有声明了订阅者才会触发数据流不订阅就不会触发。
```
Flux.just(1,2,3,4).subscribe(System.out::print);
Mono.just(1).subscribe(System.out::print);
```
### 操作符
对数据流进行一道道操作,成为操作符,比如工厂流水线。
* 操作符map。将元素映射为新的元素。
* 操作符flatmap。元素映射为流。
![](image/2022-10-13-10-38-26.png)
![](image/2022-10-13-10-41-45.png)
## 4 WebFlux执行流程和核心API
### Netty的基本原理
SpringWebflux基于Reactor默认使用容器NettyNetty是高性能的NIO框架异步非阻塞框架。
1. BIO阻塞
![](image/2022-10-13-11-44-28.png)
2. NIO非阻塞
![](image/2022-10-13-11-44-49.png)
### SpringWebFlux
* SpringWebflux核心控制器DispatchHandler实现接口WebHandler
![](image/2022-10-13-11-48-44.png)
### 关键类
DispatcherHandler负责请求处理。有三个核心类。
![](image/2022-10-13-11-51-20.png)
* HandlerMappingreactor反应器请求查询到处理方法。
* HandlerAdapter真正负责请求处理processor部分
* HandlerResultHandler对结果进行处理
### 函数式编程实现
两个核心接口。
* RouterFunction 路由处理
* HandlerFunction处理函数
## 5 WebFlux基于注解的编程的实现
### 创建WebFlux项目
1. 创建Springboot项目引入webflux的依赖
```xml
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>shangguigu09</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>shangguigu09</name>
<description>shangguigu09</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
```
2. 在配置文件中设置启动端口号8081
```yaml
server.port =8081
```
3. 从上到下设计代码:创建接口和实现类
```java
@Service
public class UserServiceImpl implements UserService {
private final Map<Integer,User> users = new HashMap<>();
public UserServiceImpl() {
this.users.put(1,new User("lucy","nan",10));
this.users.put(2,new User("mary","nv",38));
this.users.put(3,new User("jack","nv",32));
}
@Override
public Mono<User> getUserById(int id) {
return Mono.justOrEmpty(this.users.get(id));
}
@Override
public Flux<User> getAllUser() {
return Flux.fromIterable(this.users.values());
}
@Override
public Mono<Void> savaUserInfo(Mono<User> userMono) {
return userMono.doOnNext(person->{
int id = users.size() + 1;
users.put(id,person);
}).thenEmpty(Mono.empty());
}
}
```
4. 从下到上实现代码:实现业务逻辑
```java
@RestController
public class UserController {
@Autowired
private UserService userService;
//id
@GetMapping("/user/{id}")
public Mono<User> getUserById(@PathVariable int id){
return userService.getUserById(id);
}
//all
@GetMapping("/user")
public Flux<User> getAllUser(){
return userService.getAllUser();
}
//tianjian
@GetMapping("/saveuser")
public Mono<Void> saveUser(@RequestBody User user){
Mono<User> userMono = Mono.just(user);
return userService.savaUserInfo(userMono);
}
}
```
### 实现说明
* SpringMVC范式同步阻塞方式基于SpringMVC+Servlet+Tomcat
* SpringWebflux方式异步非阻塞方式基于SpringMVCWebflux+Reactor+Netty
## 6 WebFlux基于函数的编程的实现
### 简要说明
> bio,nio,aio
在使用函数式编程,需要自己初始化服务器
基于函数式编程模型的时候,有两个核心接口。
* RouterFunction 实现路由功能请求转发给对应的handler
* HandlerFunction 处理请求生成响应函数。
核心任务定义两个函数式接口的实现,并启动需要的服务器。
SpringWebFlux的请求和响应是
* ServerRequest
* ServerResponse
### 实现流程
1. 从上到下实现业务bean
2. 创建handler实现Mono方法
```java
public class UserHandler {
private final UserService userService;
public UserHandler(UserService userService){
this.userService = userService;
}
//根据id
public Mono<ServerResponse> getUserById(ServerRequest request){
//获取id值
int userid = Integer.valueOf( request.pathVariable("id"));
Mono<ServerResponse> notFound = ServerResponse.notFound().build();
//调用service方法取得数据
Mono<User> userMono = this.userService.getUserById(userid);
//UserMono进行转换返回。Reactor操作符
return userMono.flatMap(person->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(fromObject(person)))
.switchIfEmpty(notFound);
}
//所有用户
public Mono<ServerResponse> getAllUsers(){
Flux<User> users = this.userService.getAllUser();
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users,User.class);
}
//添加
public Mono<ServerResponse> saveUser(ServerRequest request){
Mono<User> userMono = request.bodyToMono(User.class);
return ServerResponse.ok().build(this.userService.savaUserInfo(userMono));
}
}
```
3. 创建并初始化服务器设置路由和handler
```java
public class Server {
//创建路由
public RouterFunction<ServerResponse> route(){
UserService userService = new UserServiceImpl();
UserHandler handler = new UserHandler(userService);
return RouterFunctions.route(GET("/users/{id}").and(accept(MediaType.APPLICATION_JSON)),handler::getUserById);
// .andRoute(GET("users").and(accept(MediaType.APPLICATION_JSON)),handler::getAllUsers)
// .andRoute(GET("saveuser").and(accept(MediaType.APPLICATION_JSON)),handler::saveUser);
}
public void createReactorServer(){
RouterFunction<ServerResponse> route = route();
HttpHandler httpHandler = toHttpHandler(route);
ReactorHttpHandlerAdapter reactorHttpHandlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer httpServer = HttpServer.create();
httpServer.handle(reactorHttpHandlerAdapter).bindNow();
}
public static void main(String[] args) throws Exception{
Server server = new Server();
server.createReactorServer();
System.out.println("enter to exit");
System.in.read();
}
}
```
### WebClient调用
```java
public class Client {
public static void main(String[] args) {
WebClient webClient = WebClient.create("http://127.0.0.1:62418");
User userMono = webClient.get().uri("/users/{id}", "1").accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).block();
System.out.println(userMono.getName());
}
}
```

View File

@@ -1,5 +1,8 @@
## 1 原理说明
类似概念
* 可观察对象、观察者。(观察者模式)
* 发布者、订阅者。(发布订阅机制)
* 事件、响应。(事件驱动、事件监听机制、响应式编程)
### 事件驱动模型和观察者模式
一下名称具有相同的含义,都是观察着模式在不同场景下的实现。
* Spring的事件驱动模型

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB