什么叫响应式编程,响应式编程入门

  什么叫响应式编程,响应式编程入门

  如何解决写爬虫IP受阻的问题?立即使用。

  推荐教程:《java视频教程》

  java响应式编程是什么?

  java响应式编程是

  响应式编程

  作为响应式编程的第一步,微软已经在。网络生态系统。Rx是它在JVM上的实现。

  响应式编程是一种异步编程范式,它通常作为观察者模式的扩展出现在面向对象语言中。

  它关注数据的流动和变化的传播。这意味着使用编程语言很容易表达静态(如数组)或动态(如事件发射源)数据流。

  响应式流

  随着时间的推移,出现了Java的标准化。它是一个规范,为JVM平台上的响应式库定义了一些接口和交互规则。它是响应式流,其接口已经集成到Java 9中,在java.util.concurrent.Flow的父类中响应式流类似于迭代器,但迭代器基于“拉”,而响应式流基于“推”。迭代器的使用实际上是命令式编程,因为由开发人员决定何时调用next()来获取下一个元素。在响应流中,与上述对等的是发布者-订阅者。但是当有新的可用元素时,发布者会将其推送给订阅者。这种“推动”是响应的关键。

  另外,推送元素的操作也是以声明的方式进行的,程序员只需要表达做什么,不用管怎么做。

  发布者使用onNext方法向订阅者推送新元素,使用onError方法通知错误,使用onComplete方法通知它已经结束。

  可以看出错误处理和完成(end)也处理的很好。并且该序列可以通过错误结束来终止。

  这种方式非常灵活。该模式支持0(无)元素/1元素/n(多)元素(包括无限序列,如果时钟滴答)。

  Reactor粉墨登场

  Reactor是第四代响应式库,是响应式编程范式的实现。它用于在JVM平台上基于响应流规范构建非阻塞异步应用程序。

  它极大地实现了JVM(http://www . reactive-streams . org/)上响应流的规范。

  它是具有高效需求管理(以管理“后压力”的形式)的完全非阻塞响应式编程的基石。

  它直接集成了Java功能API,尤其是CompletableFuture、Stream和Duration。

  它支持使用reactor-netty项目的无阻塞跨进程通信,适用于微服务架构,支持HTTP(包括Websockets)、TCP和UDP。

  注意:Reactor需要Java 8

  说了这么多,是不是应该先想想为什么需要这样一个异步响应库?

  阻塞就是浪费

  现代应用程序可以接触到大量并发用户。即使现代硬件的能力不断提高,现代软件的性能仍然是一个关键问题。

  通常有两种方法可以提高程序的性能:

  1,并行化,使用更多线程和更多硬件资源。

  2.提高效率,在目前的资源使用情况下,寻求更高的效率。

  通常,Java开发人员使用阻塞代码来编写程序。这种实用性很好,直到遇到性能瓶颈。

  此时,将引入额外的线程来运行类似的阻塞代码。但是这种扩展方式会引起资源利用上的争议和并发问题。

  更糟糕的是,阻塞浪费资源。如果你仔细观察,一旦一个程序涉及一些延迟(特别是I/O,像数据库请求或网络调用),资源就被浪费了,因为线程现在是空闲的,在等待数据。

  因此,并行化方法并不是灵丹妙药。我们有必要充分发挥硬件的作用,但资源浪费的影响和原因也很复杂。

  异步性来营救

  上面提到的第二种方法是寻求更高的效率,可以作为解决资源浪费问题的方法。

  通过编写异步非阻塞代码,您可以将执行切换到其他活动任务,使用相同的底层资源,然后稍后返回到当前处理。

  但是如何生成到JVM的异步代码呢?Java提供了两种异步编程模型:

  1.回调,异步方法没有返回值,但是会带来一个回调,当结果可用时会调用这个回调。

  2.期货,异步方法立即返回一个FutureT。异步处理过程是计算t的一个值,对它的访问由Future对象包装。该值不是立即可用的,可以轮询该对象以查看t值是否可用。

  两种技术都足够好吗?这并不适用于所有情况。这两种方法都有局限性。

  回调很难放在一起,这将很快使代码难以阅读和维护(被称为“回调地狱”)。

  相比回调,期货好一点,但在构图上还是做得不好。组合多个期货对象是可行的,但并不容易。

  未来也有其他问题。调用get()方法很容易导致另一个阻塞。

  此外,它不支持延迟计算,缺少对多个值的支持,并且缺少高级错误处理。

  从命令式到响应式编程

  像Reactor这样的响应式库的目标是解决JVM上“传统”异步模式的缺点,同时还要注意一些其他方面:

  可组合性和可读性。

  作为数据流,由有钱的运营商运营,直到你订阅,后印,消费者通知生产者投放速度太快,是高层次而不是高价值的抽象,什么都不会发生。

  可组合性和可读性

  可组合性,实际上就是安排多个异步任务的能力,使得前一个任务的结果可以作为后续任务的输入,或者以fork-join的方式执行几个任务,或者这些异步任务可以在更高层次上重用。

  任务安排的能力与代码的可读性和可维护性密切相关。随着异步处理的数量和复杂性的增加,组合和读取代码变得更加困难。

  我们可以看到,回调模型很简单,但是当回调嵌套在回调中,达到多级时,就会变成回调地狱。

  Reactor提供了丰富的组合选项来最小化嵌套层次,使得代码的组织结构能够反映出正在进行什么样的抽象处理,并且通常停留在同一层次。

  装配线类比

  您可以认为响应式应用程序处理数据就像通过装配线一样。反应器既是传送带又是工作站。

  原材料不断地从一个来源(最初的出版商)获得,最终以一个被推送给消费者(订户)的成品结束。

  原材料可以经历许多不同的转变,例如其他中间步骤,或者是更大的装配线的一部分。

  如果某处出现故障或堵塞,出现问题的工作站可以向上游发送通知,以限制原材料的流量(速率)。

  操作员

  在Reactor中,操作员是装配线类比中的工作站。每个操作符向一个发布者添加一些行为,将前一步的发布者包装到一个新的实例中。整个链条就是这样链接起来的。

  因此,数据首先从第一个发布者出来,然后沿着链条向下游移动,并由每个环节进行转换。最后,一个用户结束了这个过程。

  responsive flow规范没有指定操作符,但是Reactor提供了丰富的操作符,这些操作符涉及很多方面,从简单的转换和过滤到复杂的排列和错误处理。

  只要你不订阅,什么都不会发生。

  当您编写发布者链时,默认情况下,数据不会开始进入该链。相反,您只是创建了异步处理的抽象描述。

  通过订阅这个行为(动作),发布者和订阅者被连接起来,然后链中的数据流被触发。

  这是在内部完成的,它通过来自订阅者的请求信号向上游传播,一直向上传播到最初的发布者。

  Reactor核心特性

  Reactor引入了可组合响应的类型,它实现了publisher接口,但也提供了丰富的操作符,即Flux和Mono。

  流动的通量表示0到n个元素。

  Mono,single,表示0或1个元素。

  两者的区别主要是语义上的,表示异步处理的大致基数。

  例如,如果一个http请求只产生一个响应,那么将其表示为MonoHttpResponse显然更有意义,并且它只在0/1的上下文中提供运算符,因为count操作此时显然没有意义。

  操作员可以更改处理的最大基数,并切换到相关类型。例如,虽然FluxT上存在count运算符,但它的返回值是MonoLong。

  FluxT

  FluxT是一个标准的PublisherT,它代表一个异步序列,可以发出0到N个元素,并可以由一个完成信号或错误信号终止。

  与在响应流规范中一样,这三种类型的信号被转换成对下游订户的三个方法onNext、onComplete和onError3的调用。

  这三个方法也可以理解为事件/回调,都是可选的。

  如果没有onNext但有onComplete,则意味着一个空的有限序列。既没有onNext也没有onComplete,表示一个空的无限序列(没有实际用途,可以用来测试)。

  无限序列不一定是空的,例如,Flux.interval(Duration)产生一个FluxLong,它是无限的,并从时钟发出有规律的“滴答”。

  莫诺

  MonoT是一个特殊的PublisherT,它最多发出一个元素,可以由onComplete信号或onError信号终止。

  它提供的操作符只是Flux提供的操作符的子集。类似的,有些运营商(比如合并Mono和Publisher)可以把它切换到一个Flux。

  例如,Mono#concatWith(Publisher)返回一个通量,而Mono#then(Mono)返回另一个Mono。

  Mono可以用来表示无返回值的异步处理(类似Runnable),用MonoVoid表示。

  创建Flux或Mono并订阅它们。

  最简单的方法是使用各自的工厂方法:

  FluxString seq1=Flux.just(foo , bar , foobar );

  ListString iterable=arrays . aslist( foo , bar , foobar );

  flux string seq 2=flux . fromiterable(iterable);

  flux integer numbers fromfivetoseven=flux . range(5,3);

  mono string no data=mono . empty();

  mono string data=mono . just( foo );说到订阅,可以用Java 8的lambda表达式。对于不同的回调,订阅方法有许多不同的变化。

  下面是方法签名:

  //订阅并触发序列

  subscribe();

  //可以处理每个生成的值。

  订阅(消费者?超级T消费);

  //您也可以回应错误

  订阅(消费者?超级T消费者,

  消费者?超级可抛出错误消费者);

  //成功完成后还可以执行一些代码。

  订阅(消费者?超级T消费者,

  消费者?超级可抛出错误消费者,

  可运行的complete consumer);

  //还可以对订阅进行一些操作

  订阅(消费者?超级T消费者,

  消费者?超级可抛出错误消费者,

  可运行的完整消费者,

  消费者?超级订阅Subscription consumer);使用一次性取消订阅

  这些基于lambda的订阅方法都返回一个可处置类型,并通过调用其dispose()来取消该订阅。

  对于通量和单声道,取消是源应该停止产生元素的信号。但是,不能保证立即产生效果,一些源产生元素的速度可能太快,以至于在接收到取消信号之前就已经完成了生产。

  相关文章:《java开发教程》以上什么是java响应式编程?更多详情请关注我们的其他相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: