分布式专题——Reactive访问Spring Data R2DBC
【摘要】 @[TOC] 1、Spring Data R2DBC⼀些主要的类ConnectionFactoryDatabaseClientexecute().sql(SQL)inTransaction(db -> {})R2dbcExceptionTranslatorSqlErrorCodeR2dbcExceptionTranslator 1.1、pom<?xml version="1.0" encod...
@[TOC]
1、Spring Data R2DBC
⼀些主要的类
- ConnectionFactory
- DatabaseClient
- execute().sql(SQL)
- inTransaction(db -> {})
- R2dbcExceptionTranslator
- SqlErrorCodeR2dbcExceptionTranslator
1.1、pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zhz</groupId>
<artifactId>reactive-spring-boot-r2dbc-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>reactive-spring-boot-r2dbc-demo</name>
<description>Spring Boot使用reactive集成r2dbc</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- R2DBC依赖 -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-r2dbc</artifactId>
<version>1.0.0.M1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.r2dbc/r2dbc-h2 -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>1.0.0.M6</version>
</dependency>
<!-- R2DBC依赖 -->
<dependency>
<groupId>org.joda</groupId>
<artifactId>joda-money</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!-- Milestone的依赖不在主仓库里 -->
<repositories>
<repository>
<id>spring-milestone</id>
<name>Spring Milestones Repository</name>
<url>https://repo.spring.io/milestone/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.2、代码
package com.zhz.reactivespringbootr2dbcdemo.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.joda.money.Money;
import java.util.Date;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Coffee {
private Long id;
private String name;
private Money price;
private Date createTime;
private Date updateTime;
}
package com.zhz.reactivespringbootr2dbcdemo.converter;
import org.joda.money.CurrencyUnit;
import org.joda.money.Money;
import org.springframework.core.convert.converter.Converter;
public class MoneyReadConverter implements Converter<Long, Money> {
@Override
public Money convert(Long aLong) {
return Money.ofMinor(CurrencyUnit.of("CNY"), aLong);
}
}
package com.zhz.reactivespringbootr2dbcdemo.converter;
import org.joda.money.Money;
import org.springframework.core.convert.converter.Converter;
public class MoneyWriteConverter implements Converter<Money, Long> {
@Override
public Long convert(Money money) {
return money.getAmountMinorLong();
}
}
1.3、测试类
package com.zhz.reactivespringbootr2dbcdemo;
import com.zhz.reactivespringbootr2dbcdemo.converter.MoneyReadConverter;
import com.zhz.reactivespringbootr2dbcdemo.converter.MoneyWriteConverter;
import com.zhz.reactivespringbootr2dbcdemo.model.Coffee;
import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.convert.CustomConversions;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.dialect.Dialect;
import org.springframework.data.r2dbc.function.DatabaseClient;
import org.springframework.data.r2dbc.function.convert.R2dbcCustomConversions;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
@SpringBootApplication
@Slf4j
public class ReactiveSpringBootR2dbcDemoApplication extends AbstractR2dbcConfiguration implements ApplicationRunner {
@Autowired
private DatabaseClient client;
public static void main(String[] args) {
SpringApplication.run(ReactiveSpringBootR2dbcDemoApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
CountDownLatch cdl = new CountDownLatch(2);
client.execute()
.sql("select * from t_coffee")
.as(Coffee.class)
.fetch()
.first()
.doFinally(s -> cdl.countDown())
// .subscribeOn(Schedulers.elastic())
.subscribe(c -> log.info("Fetch execute() {}", c));
client.select()
.from("t_coffee")
.orderBy(Sort.by(Sort.Direction.DESC, "id"))
.page(PageRequest.of(0, 3))
.as(Coffee.class)
.fetch()
.all()
.doFinally(s -> cdl.countDown())
// .subscribeOn(Schedulers.elastic())
.subscribe(c -> log.info("Fetch select() {}", c));
log.info("After Starting.");
cdl.await();
}
@Override
public ConnectionFactory connectionFactory() {
return new H2ConnectionFactory(H2ConnectionConfiguration.builder().inMemory("spring-test").url("jdbc:mysql://119.29.36.141:3306/spring-test?serverTimezone=GMT%2B8").username("root").password("root").build());
}
@Bean
public R2dbcCustomConversions r2dbcCustomConversions() {
Dialect dialect = getDialect(connectionFactory());
CustomConversions.StoreConversions storeConversions =
CustomConversions.StoreConversions.of(dialect.getSimpleTypeHolder());
return new R2dbcCustomConversions(storeConversions,
Arrays.asList(new MoneyReadConverter(), new MoneyWriteConverter()));
}
}
2、R2DBC Repository ⽀持
⼀些主要的类
- @EnableR2dbcRepositories
- ReactiveCrudRepository<T, ID>
- @Table / @Id
- 其中的⽅法返回都是 Mono 或者 Flux
- ⾃定义查询需要⾃⼰写 @Query
2.1、pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zhz</groupId>
<artifactId>reactive-spring-boot-r2dbc-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>reactive-spring-boot-r2dbc-demo</name>
<description>Spring Boot使用reactive集成r2dbc</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- R2DBC依赖 -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-r2dbc</artifactId>
<version>1.0.0.M1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.r2dbc/r2dbc-h2 -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>1.0.0.M6</version>
</dependency>
<!-- R2DBC依赖 -->
<dependency>
<groupId>org.joda</groupId>
<artifactId>joda-money</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!-- Milestone的依赖不在主仓库里 -->
<repositories>
<repository>
<id>spring-milestone</id>
<name>Spring Milestones Repository</name>
<url>https://repo.spring.io/milestone/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.2、yml
spring:
datasource:
url: jdbc:mysql://119.29.36.141:3306/spring-test?serverTimezone=GMT%2B8
username: root
password: root
hikari:
connection-test-query: SELECT 1
connection-timeout: 60000
idle-timeout: 500000
max-lifetime: 540000
maximum-pool-size: 12
minimum-idle: 10
pool-name: GuliHikariPool
output:
ansi:
enabled: always
management:
endpoints:
web:
exposure:
include: '*'
2.3、代码
package com.zhz.reactivespringbootr2dbcdemo.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.joda.money.Money;
import java.util.Date;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Coffee {
private Long id;
private String name;
private Money price;
private Date createTime;
private Date updateTime;
}
package com.zhz.reactivespringbootr2dbcdemo.converter;
import org.joda.money.CurrencyUnit;
import org.joda.money.Money;
import org.springframework.core.convert.converter.Converter;
public class MoneyReadConverter implements Converter<Long, Money> {
@Override
public Money convert(Long aLong) {
return Money.ofMinor(CurrencyUnit.of("CNY"), aLong);
}
}
package com.zhz.reactivespringbootr2dbcdemo.converter;
import org.joda.money.Money;
import org.springframework.core.convert.converter.Converter;
public class MoneyWriteConverter implements Converter<Money, Long> {
@Override
public Long convert(Money money) {
return money.getAmountMinorLong();
}
}
package com.zhz.reactivespringbootr2dbcdemo.repository;
import com.zhz.reactivespringbootr2dbcdemo.model.Coffee;
import org.springframework.data.r2dbc.repository.query.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
public interface CoffeeRepository extends ReactiveCrudRepository<Coffee, Long> {
@Query("select * from t_coffee where name = $1")
Flux<Coffee> findByName(String name);
}
2.4、测试类
package com.zhz.reactivespringbootr2dbcdemo;
import com.zhz.reactivespringbootr2dbcdemo.converter.MoneyReadConverter;
import com.zhz.reactivespringbootr2dbcdemo.converter.MoneyWriteConverter;
import com.zhz.reactivespringbootr2dbcdemo.model.Coffee;
import com.zhz.reactivespringbootr2dbcdemo.repository.CoffeeRepository;
import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.convert.CustomConversions;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.dialect.Dialect;
import org.springframework.data.r2dbc.function.DatabaseClient;
import org.springframework.data.r2dbc.function.convert.R2dbcCustomConversions;
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
@SpringBootApplication
@Slf4j
public class ReactiveSpringBootR2dbcDemoApplication extends AbstractR2dbcConfiguration implements ApplicationRunner {
@Autowired
private CoffeeRepository repository;
public static void main(String[] args) {
SpringApplication.run(ReactiveSpringBootR2dbcDemoApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
coffeeRepository();
}
private void coffeeRepository() throws InterruptedException {
CountDownLatch cdl = new CountDownLatch(2);
repository.findAllById(Flux.just(1L, 2L))
.map(c -> c.getName() + "-" + c.getPrice().toString())
.doFinally(s -> cdl.countDown())
.subscribe(c -> log.info("Find {}", c));
repository.findByName("mocha")
.doFinally(s -> cdl.countDown())
.subscribe(c -> log.info("Find {}", c));
cdl.await();
}
@Override
public ConnectionFactory connectionFactory() {
return new H2ConnectionFactory(H2ConnectionConfiguration.builder().inMemory("spring-test").url("jdbc:mysql://119.29.36.141:3306/spring-test?serverTimezone=GMT%2B8").username("root").password("root").build());
}
@Bean
public R2dbcCustomConversions r2dbcCustomConversions() {
Dialect dialect = getDialect(connectionFactory());
CustomConversions.StoreConversions storeConversions =
CustomConversions.StoreConversions.of(dialect.getSimpleTypeHolder());
return new R2dbcCustomConversions(storeConversions,
Arrays.asList(new MoneyReadConverter(), new MoneyWriteConverter()));
}
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)