使用 RxJava 进行响应式编程

举报
千锋教育 发表于 2023/08/01 18:47:24 2023/08/01
【摘要】 ReactiveX 是最成熟的反应式编程框架之一,RxJava 是其基于 Java 的实现。让我们看看我们可以用 RxJava 做什么。反应式编程采用函数式范例和复杂的大型编程功能的层次。这些功能允许在应用程序架构中使用类似功能的语义。ReactiveX 是反应式世界中最强大的项目之一,为语言实现者提供了一组通用规范。本文是对 RxJava(ReactiveX 的 Java 实现)的实践探索...

ReactiveX 是最成熟的反应式编程框架之一,RxJava 是其基于 Java 的实现。让我们看看我们可以用 RxJava 做什么。

企业微信截图_20230801184606.jpg

反应式编程采用函数式范例和复杂的大型编程功能的层次。这些功能允许在应用程序架构中使用类似功能的语义。ReactiveX 是反应式世界中最强大的项目之一,为语言实现者提供了一组通用规范。本文是对 RxJava(ReactiveX 的 Java 实现)的实践探索。

RxJava 入门

为了测试 RxJava,我们将编写一个命令行应用程序来监视CoinCap开发的公共事件流。该事件流提供了一个 WebSocket API,它就像一个 JSON 格式事件的火炉,适用于各种加密货币交易所上的每笔交易。我们首先简单地获取这些事件并将它们打印到控制台。然后我们将添加一些更复杂的处理来展示 RxJava 的功能。

清单 1 让我们开始使用 Maven 快速入门原型,它为我们的演示应用程序提供了脚手架。


清单 1. Maven 快速入门


mvn archetype:generate -DgroupId=com.infoworld -DartifactId=rxjava -DarchetypeArtifactId=maven-archetype-quickstart

现在我们在目录中存储了一个简单的项目脚手架/rxjava。我们可以修改pom.xml以包含我们需要的依赖项。我们还设置了程序的 Java 版本,如清单 2 所示。

清单 2. 修改后的 pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.infoworld</groupId>
  <artifactId>rxjava</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>rxjava</name>
  <url>http://maven.apache.org</url>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>16</source>
          <target>16</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
      </dependency>
    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.2.21</version>
    </dependency>
<dependency>
        <groupId>com.squareup.okhttp3</groupId>
        <artifactId>okhttp</artifactId>
        <version>4.9.1</version>
    </dependency>
    <!-- JSON library for parsing GitHub API response -->
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.9</version>
    </dependency>
  </dependencies>
</project>

要确认一切正常,请输入:$ mvn clean install exec:java -Dexec.mainClass="com.infoworld.App"。该命令应该会产生经典的“Hello World”输出。

现在,我们将添加用于从 WebSocket 端点提取事件并将其显示在控制台中的基本功能的代码。您可以在清单 3 中看到这段代码。

清单 3. 添加一个功能


package com.infoworld;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;

public class App {
  public static void main(String[] args) {
    String websocketUrl = "wss://ws.coincap.io/trades/binance";
    OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder().url(websocketUrl).build();

    Observable<String> observable = Observable.create(emitter -> {
      WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() {
        @Override
        public void onOpen(WebSocket webSocket, okhttp3.Response response) {
          // WebSocket connection is open
        }
        @Override
        public void onMessage(WebSocket webSocket, String text) {
          emitter.onNext(text); // Emit received message
        }
        @Override
        public void onMessage(WebSocket webSocket, ByteString bytes)           {
        // Handle binary message if needed
        }
        @Override
        public void onClosing(WebSocket webSocket, int code, String reason) {
          webSocket.close(code, reason);
        }
        @Override
        public void onClosed(WebSocket webSocket, int code, String reason) {
          emitter.onComplete(); // WebSocket connection is closed
        }
        @Override
        public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) {
          emitter.onError(t); // WebSocket connection failure
        }
      });
    // Dispose WebSocket connection when the observer is disposed
    emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
    });
    observable
      .subscribeOn(Schedulers.io())
      .subscribe(new Observer<String>() {
      @Override
      public void onSubscribe(Disposable d) {
        // No-op
      }
     @Override
     public void onNext(String event) {
       // Process each event here
       System.out.println(event);
     }
     @Override
     public void onError(Throwable e) {
       e.printStackTrace();
     }

     @Override
     public void onComplete() {
       System.out.println("Completed");
     }
   });

   // Wait indefinitely (or use another mechanism to keep the program running)
   try {
      Thread.sleep(Long.MAX_VALUE);
   } catch (InterruptedException e) {
     e.printStackTrace();
   }
  }
}

如果运行此程序,您将获得 JSON 事件的逐行输出,每行一个事件。要杀死它,请按Ctrl/Command-c


事件流建模

清单 3 让我们很好地了解了一些 RxJava 基础知识。我们使用 获取到 binance 推送端点 (wss://ws.coincap.io/trades/binance) 的连接OkHttpClient,这使得使用 WebSocket API 变得容易。(请参阅OkHttpClient 文档。)

一旦我们打开连接,我们就创建一个新的Observable. AnObservable是发出事件的基本类型,是可以观看(或收听)的对象。换句话说,anObservable是某种事件源,它可以对许多不同的源进行建模。在本例中,我们使用该Observables.create方法创建一个新源,该方法是一个高阶函数,接受带有单个参数的函数,我们将其命名为emitter

emitter对象具有我们生成事件流所需的所有回调方法。从某种意义上说,我们希望将 WebSocket 流包装在自定义 RxJava 事件源中。为此,我们需要从中获取所需的回调WebSocketClient(特别String是 的版本onMessage)并调用emitter我们想要的方法,在本例中为emitter.onNext(text);. (还有生命周期事件的回调,例如onClosedonError。) 

这为我们提供了一个Observable可以交给任何需要它的人以便了解正在发生的事情的信息。这是一种标准化、可移植的事件流建模方法。此外,它具有高度可塑性,具有一系列功能转换,您很快就会看到。 

以下是我们关闭发射器的方法:


emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
});. 

以这种方式关闭发射器可确保我们在发射器完成时关闭 WebSocket 连接。 

观察事件

为了观察来自 的事件Observable,我们使用对象subscribe上的方法Observable。我们首先调用.subscribeOn(Schedulers.io()),它告诉 RxJava 在后台线程中运行。这是获得多线程并发的(非常)简单的方法。RxJava 甚至为您使用线程池。

处理事件的主要工作是通过Observersubscribe方法传递 an 来完成。类Observer是硬币的另一面Observable:任何想要监视事件的基本类型。在本例中,我们在调用中内联创建一个新的匿名Observer(用<String>泛型参数化)subscribe()。将事件写入控制台的实际工作发生onNext(String)Observer.

操作事件流

现在让我们对流执行一些操作。首先,我们将使用 GSON 将 转换String为 JSON 对象。然后,我们将使用该对象仅过滤 Solana 区块链上的交易。

为此,我们可以在类上使用map()和方法。使用,我们可以将字符串逐个事件转换为 JSON 对象。然后,我们在方法中使用 JSON来仅保留那些以“Solana”作为货币的事件(在 CoinCap 规范中,所使用的加密货币位于“base”字段中)。您可以在清单 4 中看到这个新代码。filter()Observablemap()filter()

清单 4. 使用 map() 和 filter()


import com.google.gson.Gson;
import com.google.gson.JsonObject;
//… The rest is the same
observable
  .subscribeOn(Schedulers.io())
  .map(event -> {
    Gson gson = new Gson();
    JsonObject jsonObject = gson.fromJson(event, JsonObject.class);
    return jsonObject;
  })
  .filter(jsonObject -> {
    String base = jsonObject.get("base").getAsString();
    return base.equals("solana");
  })
  .subscribe(
    jsonObject -> System.out.println(jsonObject),
    Throwable::printStackTrace,
    () -> System.out.println("Completed")
  );

和调用相当容易阅读 map。将我们的流变成流。 当他们到达时接受s。它只保留基字段等于“solana”的字段。 filtermap()StringJsonObjectfilter()JsonObject

清单 4 还向我们展示了该方法的不同重载subscribe()。这个参数不是一个Observer实例,而是三个参数:onNextonErroronComplete函数。它的工作原理是一样的。还有一个仅接受onNext处理程序的单参数版本。

另外,请注意,map和 与filter我们在 Java 流和其他语言(如 JavaScript)中了解和喜爱的函数式操作相同。但现在,我们可以将它们应用于广泛的事件源。事实上,我们可以将这些操作应用到任何可以用Observers 和Observables 处理的东西上。

结论

RxJava 中的响应式编程以灵活的语法为您提供了强大的力量。它可以在多种情况下使用。正如我们所看到的,它在处理像 CoinCap API 这样的流数据源时非常方便。将事件流作为对象传递的能力是现代软件中的一个重要抽象。每个开发人员都应该了解它。您可以在 GitHub 上找到示例应用程序的完整源代码 。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。