03、Netty学习笔记—(Netty组件学习)(下)

举报
长路 发表于 2022/11/28 08:22:22 2022/11/28
【摘要】 文章目录一、认识Netty1.1 Netty 是什么?1.2 Netty 的作者1.3 Netty 的地位1.4 Netty 的优势二、netty入门程序HelloWorld!2.1、netty入门:客户端->服务端 helloworld2.1.1、服务端2.1.2、客户端2.2、流程梳理2.3、netty-helloworld的各个组件通俗介绍三、组件3.1、EventLoop3.1.1、

3.6.5、写入

常用的方法

方法签名 含义 备注
writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串
  • 带有LE的就是大端写入,不带的则是小端写入。网络编程中的两个名词,代表的是先写高位字节,还是先写低位字节;一般采用大端写入!
    • 大端写入:低位靠后,先写高位的0。
    • 小端写入:低位先写,与大端相反。
  • 对于ByteBuf提供了写入ByteBuf以及stringbuilder、stringbuffer、string的API。

注意点:①这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用。②网络传输,默认习惯是 Big Endian。

demo

案例目的:测试是否能够正常写入字符串、字节等。

/**
     * 03、测试ByteBuf的写入与扩容
     */
public static void writeToByteBufDemo(){
    final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20);
    buffer.writeBytes("c".getBytes());//写入字节
    final StringBuilder builder = new StringBuilder("hang");
    buffer.writeCharSequence(builder, Charset.defaultCharset());//写入stringbuilder
    buffer.writeCharSequence("lu", Charset.defaultCharset());//写入字符串
    log(buffer);
    //测试扩容
    buffer.writeCharSequence(",helloworld", Charset.defaultCharset());
    log(buffer);
}

image-20220107172737095



3.6.5、扩容

默认若是不指定的话则最大容量是整数的最大值。

扩容规则是

  • 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
  • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)
  • 扩容不能超过 max capacity 会报错


3.6.7、读取

案例目的:读取字节以及标记重复读取

/**
     * 04、测试ByteBuf的读取:包含重复读取某个字节
     */
public static void readByteBufDemo(){
    final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20);
    buffer.writeBytes("123456789".getBytes());//写入字节
    System.out.println(buffer.readByte());//读取一个字节
    System.out.println(buffer.readByte());
    System.out.println(buffer.readByte());
    System.out.println(buffer.readByte());
    buffer.markReaderIndex();//可标记读索引以及写索引
    buffer.readBytes(4);
    buffer.resetReaderIndex();//重置读索引
    log.debug("读取读索引的字节");
    System.out.println(buffer.readByte());
}

1、读取内容使用read开头的API,这类API会移动读指针。

2、若是使用get开头API,不会移动读指针。

3、若是想要回读或重读可以设置mark标记,同样也可以设置读或写标记!

image-20220107175828328



3.6.8、retain & release (释放ByteBuf)

3.6.8.1、释放分析

由于 Netty 中有堆外内存(指的是直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可。
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存。
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存。

扩展:可达性分析是通过一系列的GC ROOTS对象来连接有用的对象,走过的路径会形成一条链,当有对象到GC ROOTS没有一条引用链的时候就要被回收了。

核心:在实际业务场景中,入站、出站操作中都会使用到ByteBuf,针对于池化的Bytebuf则会将用完之后的ByteBuf还回内存池,来达到内存重用!在入站、出站过程中经历多个handler,其中head、tail handler是netty默认定义好的,两者都能够进行收尾工作(指的是若是最终传递得到的Object msg的对象ByteBuf就会进行自动回收,若是其他类型则不处理)。:

误解:不要觉得头和尾都可以释放我们中途就可以不管bytebuf的释放了,因为其释放时机需要把bytebuf对象一直传到头或尾handler才会释放。若是在中途已经将bytebuf转换成字符串了接着进行下面的传递,此时到tail拿到的仅仅是那个字符串了就不是bytebuf了,既然如此就不会做释放处理。

最合适的释放时机:谁最后拿到bytebuf(传递已对bytebuf进行解析并将解析后的内容向后传递的handler)就要对ByteBuf进行释放。若是从头置尾handler直接都是传递的ByteBuf中间也可以不手动释放,最后也会给我们进行释放,不过最好就是哪里用完了ByteBuf(解析完)就进行释放!



3.6.8.2、源码分析(head、tail)

基本规则是,谁是最后使用者,谁负责 release,详细分析如下

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
  • 入站 ByteBuf 处理原则
    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则
    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则
    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

tail handler:入站最后执行的处理器

//可以看到实现了ChannelInboundHandler接口
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    
    //关注其中的read方法
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DefaultChannelPipeline.this.onUnhandledInboundMessage(msg);
    }
}
    
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
    } finally {
        //使用了一个工具类来进行尝试释放
        ReferenceCountUtil.release(msg);
    }
}

public static boolean release(Object msg) {
    //可以看到会使用instanceOf来判断是否是ByteBuf,因为ByteBuf实现了引用计数的接口,若是是的话就会进行释放
    //public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
    return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}

head handler:出站的最后一个handler执行器

//可以注意到其实现了ChannelOutboundHandler、ChannelInboundHandler,则表示又是入站执行器,也是出站执行器。
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
    
    //对于出站就要关注其write方法
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        this.unsafe.write(msg, promise);
    }
}

//AbstractChannel.class
public final void write(Object msg, ChannelPromise promise) {
            this.assertEventLoop();
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            //这里做了一次是否为出栈BUffer判定。若是的话则表示当前方法是在出站时进行调用的。
            if (outboundBuffer == null) {
                this.safeSetFailure(promise, AbstractChannel.WRITE_CLOSED_CHANNEL_EXCEPTION);
                //可以看到这里也进行了释放操作,内部源码实际上就是对msg类型进行判断,若是ByteBuf就释放。
                ReferenceCountUtil.release(msg);
            } else {
                ...
            }
        }


3.6.8、零拷贝

netty的零拷贝体现在网络数据传输、文件传输以及数据操作的优化,下面就主要介绍数据操作的零拷贝优化。

  • netty中的零拷贝主要也是指减少数据复制,提升性能。

通过wrap(),可将byte[]数组、ByteBuf、ByteBuffer等包装成一个Netty ByteBuf对象,避免了复制拷贝操作。

通过duplicate(),可将整个ByteBuf进行零拷贝。

通过slice(),可将ByteBuf分解为多个共享同一个存储区域的ByteBuf, 避免内存的拷贝。

通过CompositeByteBuf,可将多个ByteBuf进行合并。



3.6.8.1、slice:切割

slice是数据零拷贝的体现之一

①实际应用

案例目的:对某个Bytebuf进行数据分割放置到两个ByteBuf中。

/**
     * 实际应用:零拷贝获取head、body
     */
public static void practicalUse(){
    final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20);
    buffer.writeCharSequence("head,body", Charset.defaultCharset());
    //若是要对某一个ByteBuf进行切割操作,第一部分要的是前5个,第二部分要的是后5个
    //应用场景:对请求body、head进行切割。分割得到的两个部分实际上使用的是原先Buffer的共享内存
    final ByteBuf front = buffer.slice(0, 4);//第一个参数是切割的位置,第二个参数是切割的数量
    log(front);
    final ByteBuf end = buffer.slice(5, 4);
    log(end);
}

image-20220107225010376


②修改切割得到的某个ByteBuf位置内容也会影响源ByteBuf;切割得到的ByteBuf无法写入

/**
     * Slice切片得到的ByteBuf进行测试
     */
public static void sliceTest(){
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20);
    buffer.writeBytes(new byte[]{1,2,3,4});
    final ByteBuf sliceBuf = buffer.slice(0, 4);
    //1、修改切片得到的ByteBuf也会影响原始的ByteBuf,因为使用的是同一块内存
    sliceBuf.setByte(0,6);
    log(buffer);
    //2、无法对切片进行write操作,会抛出异常IndexOutOfBoundsException
    sliceBuf.writeByte(10);
}

image-20220107232813766


③release()与retain()应用场景

release()与retain()可对使用相同内存的ByteBuf同时进行引用计数!

/**
     * release()与retain()使用
     */
public static void sliceTest2(){
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20);
    buffer.writeBytes(new byte[]{1,2,3,4});
    final ByteBuf sliceBuf = buffer.slice(0, 4);
    //这里引用计数+1,对于原ByteBuf以及切割得到的ByteBuf都有影响,因为是占用的同一块内存
    sliceBuf.retain();//引用计数+1
    buffer.release();
    //若是直接对原ByteBuf进行清理,然后使用切片得到的ByteBuf会抛出异常IllegalReferenceCountException: refCnt: 0
    //若是在release()之后也想正常使用,可以在此之前使用retain()进行引用+1,release()相对于会引用-1,此时就不会真正释放内存,自然也就能欧使用
    log(sliceBuf);
}

image-20220107233031545



3.6.8.2、duplicate:整块

效果:好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的。

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import java.nio.charset.Charset;

import static com.changlu.No3Netty入门.No2Netty组件.ByteBuf.ByteBufTest.log;

/**
 * @ClassName DuplicateTest
 * @Author ChangLu
 * @Date 2022/1/7 23:32
 * @Description Duplicate:整块零拷贝
 */
public class DuplicateTest {

    public static void main(String[] args) {
        final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20);
        buffer.writeCharSequence("changlu", Charset.defaultCharset());
        final ByteBuf dupBuf = buffer.duplicate();
        //对整块进行零拷贝的进行修改
        dupBuf.setByte(0,1);
        log(buffer);//测试源ByteBuf受到影响
    }

}

效果:

image-20220107233925077



3.6.8.3、copy:深拷贝(非零拷贝)

copy:就是对整个ByteBuf进行深拷贝,拷贝过后的能够进行写入,并且修改的位置内容不会影响源位置。

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import java.nio.charset.Charset;

import static com.changlu.No3Netty入门.No2Netty组件.ByteBuf.ByteBufTest.log;

/**
 * @ClassName CopyTest
 * @Author ChangLu
 * @Date 2022/1/7 23:37
 * @Description Copy:整个ByteBuf进行深拷贝
 */
public class CopyTest {

    public static void main(String[] args) {
        final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20);
        buffer.writeCharSequence("changlu", Charset.defaultCharset());
        //进行深拷贝
        final ByteBuf copyBuf = buffer.copy();
        copyBuf.setByte(0,1);
        copyBuf.writeByte(6);
        //测试源buffer
        log(buffer);
        //测试深拷贝得到buffer
        log(copyBuf);
    }

}

效果:

image-20220107234215489



3.6.8.4、CompositeBuffer:组装ByteBuf

CompositeByteBuf是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。

  • 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
  • 缺点,复杂了很多,多次操作会带来性能的损耗

功能:可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝。注意要设置true来让其调整读,写指针。

案例:包含两个测试

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;

import java.nio.charset.Charset;

import static com.changlu.No3Netty入门.No2Netty组件.ByteBuf.ByteBufTest.log;

/**
 * @ClassName CompositeBufferTest
 * @Author ChangLu
 * @Date 2022/1/7 23:48
 * @Description CompositeBuffer:零拷贝之一,合并ByteBuf
 */
public class CompositeBufferTest {

    public static void main(String[] args) {
        final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20);
        buffer.writeCharSequence("changlu", Charset.defaultCharset());

        final ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer(20);
        buffer1.writeCharSequence("liner", Charset.defaultCharset());

        //效率较低方案:直接通过writeBytes()写入字节方式写入
//        log(ByteBufAllocator.DEFAULT.buffer(20).writeBytes(buffer).writeBytes(buffer1));

        //零拷贝:合并两个Buffer到一个Buffer中,使用的共享内存
        final CompositeByteBuf comBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
        //测试一:不设置true
//        comBuf.addComponents(buffer, buffer1);//若是不设置true,则不会自动调整读、写指针位置造成数据不会加进来
        //测试二:设置true
        comBuf.addComponents(true, buffer, buffer1);
        log(comBuf);
    }

}

效果

测试一:

image-20220107235513472

测试二:

image-20220107235538277



3.6.8.5、工具类Unpooled(提供了非池化的 ByteBuf 创建、组合、复制等操作)

Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作。

这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf。

案例目的:测试组合方法wrappedBuffer

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;

import static com.changlu.No3Netty入门.No2Netty组件.ByteBuf.ByteBufTest.log;

/**
 * @ClassName UnpooledTest
 * @Author ChangLu
 * @Date 2022/1/7 23:59
 * @Description Unpooled:非池化ByteBuf进行零拷贝的工具类
 */
public class UnpooledTest {

    public static void main(String[] args) {
        ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
        buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
        ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
        buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

        // 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
        ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
        buf3.setByte(0,6);
        log(buf1);
    }

}

效果

image-20220108000210197



3.6.9、ByteBuf优势汇总

1、池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能。

2、读写指针分离,不需要像 ByteBuffer 一样切换读写模式。

3、实现自动扩容。

4、支持链式调用,使用更流畅。

5、很多地方体现零拷贝,例如 wrap、slice、duplicate、CompositeByteBuf。



案例、回显服务器(双向通信)

描述+code(netty)

前提描述

实现功能:客户端向服务器发什么,服务端就返回什么。

出现的问题bytebuf的释放问题,下面是问题和解答(个人见解)。

  • 服务器接收到客户端发来的数据,是否要手动释放?
    • 若是不手动调用ctx.fireChannelRead(),就不会走到tail handler!(debug测试测出来)一般两种情况,①若是在该handler中使用完了ByteBuf,那么就直接手动释放;②若是没有进行解析之类的操作,那么可以直接传递到后面handler,也就是tail handler也会帮你进行释放操作,ctx.fireChannelRead()。
  • 回显业务必然会创建一个ByteBuf对象,是否需要手动释放?
    • 对于自己创建的ByteBuf,则需要进行手动释放,在这里回显业务是调用了writeAndFlush这是一个异步操作,那么添加一个监听器当写入完毕之后就进行手动释放!

code

服务器:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

/**
 * @ClassName Server
 * @Author ChangLu
 * @Date 2022/1/8 9:42
 * @Description echoserver:提供回显服务的服务器,就是收到什么,然后就发送什么的程序。
 */
@Slf4j
public class Server {

    public static void main(String[] args) throws InterruptedException {
        new ServerBootstrap()
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf)msg;
                                log.debug("收到客户端发送数据:{}", buf.toString(Charset.defaultCharset()));
                                final ByteBuf response = ctx.alloc().buffer();
                                response.writeBytes(buf);
                                //向客户端回发数据:需要手动释放
                                ctx.writeAndFlush(response).addListener((future)->{
                                    //释放ByteBuf
                                    ReferenceCountUtil.release(response);
                                });
                                //向后传递让Tail handler来进行释放msg
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                })
                .bind(8080).sync();
        System.out.println("服务器启动成功!");
    }

}

客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;
import java.util.Scanner;

/**
 * @ClassName Client
 * @Author ChangLu
 * @Date 2022/1/8 9:49
 * @Description Client:客户端连接
 */
@Slf4j
public class Client {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        Channel channel = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());//String=>ByteBuf
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buffer = (ByteBuf) msg;
                                log.debug("收到服务端发送的数据:{}", buffer.toString(Charset.defaultCharset()));
                                //同理这里也需要进行向后传递进行释放ByteBuf
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                }).connect("127.0.0.1", 8080).sync().channel();
        log.debug("客户端连接成功:{}", channel);
        channel.closeFuture().addListener(future -> {
            group.shutdownGracefully();
        });

        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String line = scanner.nextLine();
                if ("q".equals(line)) {
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
        }).start();
    }
}

效果:

image-20220108102812905

image-20220108102841360

回显效果:

image-20220108102914249

image-20220108102938063



扩展:读写误解解答(含socket实现)

只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,这是不正确的。

实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 BB 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读。

案例demo

案例目的:测试同一个Socket的读、写操作是否是双向信号通信,也就是全双工!(通过给写线程打上断点,之后看读线程是否能够正常运行)

Server

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @ClassName Server
 * @Author ChangLu
 * @Date 2022/1/8 10:35
 * @Description 服务端:接收到连接之后,启动读写线程
 */
public class Server {

    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8888);
        Socket s = ss.accept();

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                // 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

client:

import java.io.*;
import java.net.Socket;

/**
 * @ClassName Client
 * @Author ChangLu
 * @Date 2022/1/8 10:35
 * @Description 客户端:同样有读写线程,建立连接之后写线程向服务端发送数据,读线程监听服务端发来的数据
 */
public class Client {

    public static void main(String[] args) throws IOException {
        Socket s = new Socket("localhost", 8888);

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。