使用 RxJava 进行响应式编程
ReactiveX 是最成熟的反应式编程框架之一,RxJava 是其基于 Java 的实现。让我们看看我们可以用 RxJava 做什么。
反应式编程采用函数式范例和复杂的大型编程功能的层次。这些功能允许在应用程序架构中使用类似功能的语义。ReactiveX 是反应式世界中最强大的项目之一,为语言实现者提供了一组通用规范。本文是对 RxJava(ReactiveX 的 Java 实现)的实践探索。
RxJava 入门
为了测试 RxJava,我们将编写一个命令行应用程序来监视CoinCap开发的公共事件流。该事件流提供了一个 WebSocket API,它就像一个 JSON 格式事件的火炉,适用于各种加密货币交易所上的每笔交易。我们首先简单地获取这些事件并将它们打印到控制台。然后我们将添加一些更复杂的处理来展示 RxJava 的功能。
清单 1 让我们开始使用 Maven 快速入门原型,它为我们的演示应用程序提供了脚手架。
清单 1. Maven 快速入门
mvn archetype:generate -DgroupId=com.infoworld -DartifactId=rxjava -DarchetypeArtifactId=maven-archetype-quickstart
现在我们在目录中存储了一个简单的项目脚手架/rxjava
。我们可以修改pom.xml
以包含我们需要的依赖项。我们还设置了程序的 Java 版本,如清单 2 所示。
清单 2. 修改后的 pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.infoworld</groupId>
<artifactId>rxjava</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>rxjava</name>
<url>http://maven.apache.org</url>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>16</source>
<target>16</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.1</version>
</dependency>
<!-- JSON library for parsing GitHub API response -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
</dependencies>
</project>
要确认一切正常,请输入:$ mvn clean install exec:java -Dexec.mainClass="com.infoworld.App"
。该命令应该会产生经典的“Hello World”输出。
现在,我们将添加用于从 WebSocket 端点提取事件并将其显示在控制台中的基本功能的代码。您可以在清单 3 中看到这段代码。
清单 3. 添加一个功能
package com.infoworld;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
public class App {
public static void main(String[] args) {
String websocketUrl = "wss://ws.coincap.io/trades/binance";
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(websocketUrl).build();
Observable<String> observable = Observable.create(emitter -> {
WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, okhttp3.Response response) {
// WebSocket connection is open
}
@Override
public void onMessage(WebSocket webSocket, String text) {
emitter.onNext(text); // Emit received message
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
// Handle binary message if needed
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
webSocket.close(code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
emitter.onComplete(); // WebSocket connection is closed
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) {
emitter.onError(t); // WebSocket connection failure
}
});
// Dispose WebSocket connection when the observer is disposed
emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
});
observable
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// No-op
}
@Override
public void onNext(String event) {
// Process each event here
System.out.println(event);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
// Wait indefinitely (or use another mechanism to keep the program running)
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如果运行此程序,您将获得 JSON 事件的逐行输出,每行一个事件。要杀死它,请按Ctrl/Command-c。
事件流建模
清单 3 让我们很好地了解了一些 RxJava 基础知识。我们使用 获取到 binance 推送端点 (wss://ws.coincap.io/trades/binance) 的连接OkHttpClient
,这使得使用 WebSocket API 变得容易。(请参阅OkHttpClient 文档。)
一旦我们打开连接,我们就创建一个新的Observable
. AnObservable
是发出事件的基本类型,是可以观看(或收听)的对象。换句话说,anObservable
是某种事件源,它可以对许多不同的源进行建模。在本例中,我们使用该Observables.create
方法创建一个新源,该方法是一个高阶函数,接受带有单个参数的函数,我们将其命名为emitter
。
该emitter
对象具有我们生成事件流所需的所有回调方法。从某种意义上说,我们希望将 WebSocket 流包装在自定义 RxJava 事件源中。为此,我们需要从中获取所需的回调WebSocketClient
(特别String
是 的版本onMessage
)并调用emitter
我们想要的方法,在本例中为emitter.onNext(text);
. (还有生命周期事件的回调,例如onClosed
和onError
。)
这为我们提供了一个Observable
可以交给任何需要它的人以便了解正在发生的事情的信息。这是一种标准化、可移植的事件流建模方法。此外,它具有高度可塑性,具有一系列功能转换,您很快就会看到。
以下是我们关闭发射器的方法:
emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
});.
以这种方式关闭发射器可确保我们在发射器完成时关闭 WebSocket 连接。
观察事件
为了观察来自 的事件Observable
,我们使用对象subscribe
上的方法Observable
。我们首先调用.subscribeOn(Schedulers.io())
,它告诉 RxJava 在后台线程中运行。这是获得多线程并发的(非常)简单的方法。RxJava 甚至为您使用线程池。
处理事件的主要工作是通过Observer
向subscribe
方法传递 an 来完成。类Observer
是硬币的另一面Observable
:任何想要监视事件的基本类型。在本例中,我们在调用中内联创建一个新的匿名Observer
(用<String>
泛型参数化)subscribe()
。将事件写入控制台的实际工作发生onNext(String)
在Observer
.
操作事件流
现在让我们对流执行一些操作。首先,我们将使用 GSON 将 转换String
为 JSON 对象。然后,我们将使用该对象仅过滤 Solana 区块链上的交易。
为此,我们可以在类上使用map()
和方法。使用,我们可以将字符串逐个事件转换为 JSON 对象。然后,我们在方法中使用 JSON来仅保留那些以“Solana”作为货币的事件(在 CoinCap 规范中,所使用的加密货币位于“base”字段中)。您可以在清单 4 中看到这个新代码。filter()
Observable
map()
filter()
清单 4. 使用 map() 和 filter()
import com.google.gson.Gson;
import com.google.gson.JsonObject;
//… The rest is the same
observable
.subscribeOn(Schedulers.io())
.map(event -> {
Gson gson = new Gson();
JsonObject jsonObject = gson.fromJson(event, JsonObject.class);
return jsonObject;
})
.filter(jsonObject -> {
String base = jsonObject.get("base").getAsString();
return base.equals("solana");
})
.subscribe(
jsonObject -> System.out.println(jsonObject),
Throwable::printStackTrace,
() -> System.out.println("Completed")
);
和调用相当容易阅读 map
。将我们的流变成流。 当他们到达时接受s。它只保留基字段等于“solana”的字段。 filter
map()
String
JsonObject
filter()
JsonObject
清单 4 还向我们展示了该方法的不同重载subscribe()
。这个参数不是一个Observer
实例,而是三个参数:onNext
、onError
和onComplete
函数。它的工作原理是一样的。还有一个仅接受onNext
处理程序的单参数版本。
另外,请注意,map
和 与filter
我们在 Java 流和其他语言(如 JavaScript)中了解和喜爱的函数式操作相同。但现在,我们可以将它们应用于广泛的事件源。事实上,我们可以将这些操作应用到任何可以用Observer
s 和Observable
s 处理的东西上。
结论
RxJava 中的响应式编程以灵活的语法为您提供了强大的力量。它可以在多种情况下使用。正如我们所看到的,它在处理像 CoinCap API 这样的流数据源时非常方便。将事件流作为对象传递的能力是现代软件中的一个重要抽象。每个开发人员都应该了解它。您可以在 GitHub 上找到示例应用程序的完整源代码 。
- 点赞
- 收藏
- 关注作者
评论(0)