眼底血管分离
【摘要】 U-net网络常见于图像分割任务,最初的提出是用于解决神经细胞的边界分割,相对于传统的 FCN 结构,U-Net网络创新性地加入了跳跃连接,使得我们可以利用浅层的空间信息帮助细节的恢复。此后,U-Net便在分割领域大放异彩,在压缩、超分辨率等方向也得到了进一步的应用,很好地证明了这种编码-解码结构的有效性。
import torch.nn as nn #
import torch.nn.functional as F
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torch
import os
from torch.utils.data import Dataset
class conv_block(nn.Module):
def __init__(self, in_ch, out_ch):
super(conv_block, self).__init__()
# 定义一个序列操作 卷积->BN->Relu->卷积->BN->Relu
self.conv = nn.Sequential(
# 参数分别为:输入通道数,输出通道数,卷积核大小以及填充大小
nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1),#卷积层
nn.BatchNorm2d(out_ch),#BN
nn.ReLU(inplace=True), #激活函数
nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1), #再卷积
nn.BatchNorm2d(out_ch), #BN
nn.ReLU(inplace=True) #再激活
)
def forward(self, x):
x = self.conv(x)
return x
class down(nn.Module):#下采样
def __init__(self, in_ch, out_ch):
super(down, self).__init__()
# 下采样步骤包含了最大池化和一个卷积块
self.max_pool_conv = nn.Sequential(
nn.MaxPool2d(2),
conv_block(in_ch, out_ch)
)
def forward(self, x):
x = self.max_pool_conv(x) #最大池化层
return x
class up(nn.Module):#上采样
def __init__(self, in_ch, out_ch):
super(up, self).__init__()
# 上采样模块包括了转置卷积和一个卷积块
self.up = nn.ConvTranspose2d(in_ch // 2, in_ch // 2, 2, stride=2)
self.conv = conv_block(in_ch, out_ch)
def forward(self, x1, x2):
x1 = self.up(x1)
# U-Net中的跳跃连接,将两个特征图拼接起来
x = torch.cat([x2, x1], dim=1)
x = self.conv(x)
return x
class outconv(nn.Module):
def __init__(self, in_ch, out_ch):
super(outconv, self).__init__()
# 输出块,二分类问题输出一个特征图即可,此时out_ch应该为1
self.conv = nn.Conv2d(in_ch, out_ch, kernel_size=1)
def forward(self, x):
x = self.conv(x)
return x
class UNet(nn.Module):
def __init__(self, n_channels=3, n_classes=1):#定义一个序列操作 输入->下采样->上采样->跳跃连接->输出
super(UNet, self).__init__()
self.inc = conv_block(n_channels, 64)
self.down1 = down(64, 128) #经过四次下采样来降低图像尺寸
self.down2 = down(128, 256)#层层迭代
self.down3 = down(256, 512)#定义大小
self.down4 = down(512, 512)
self.up1 = up(1024, 256) #再经过四次上采样来获取一些深层次的特征
self.up2 = up(512, 128)
self.up3 = up(256, 64)
self.up4 = up(128, 64)
self.outc = outconv(64, n_classes)
def forward(self, x): #具体定义各层之间的输入对象
x1 = self.inc(x)
x2 = self.down1(x1)#下采样包含了最大池化和一个卷积块
x3 = self.down2(x2)
x4 = self.down3(x3)
x5 = self.down4(x4)
x = self.up1(x5, x4) #上采样包括了转置卷积和一个卷积块
x = self.up2(x, x3)
x = self.up3(x, x2)
x = self.up4(x, x1)
x = self.outc(x)
# 使用Sigmoid进行归一化
return torch.sigmoid(x)
class DRIVE_Loader(Dataset): #加载数据集
def __init__(self, img_dir, mask_dir, img_size=(512, 512), mode='train'):
self.img_dir = img_dir #读取图像路径
self.mask_dir = mask_dir #掩模的文件路径
self.img_size = img_size #读取图像大小
self.mode = mode
self.file_list = os.listdir(img_dir) #返回指定的文件夹包含的文件或文件夹的名字的列表。
# 以8:2的比例分割数据集作为训练集和验证集
self.split_dataset(0.8)
def split_dataset(self, ratio):
# 分割训练集和验证集
train_len = int(ratio*len(self.file_list))
if self.mode == 'train':
self.file_list = self.file_list[:train_len] #训练集
else:
self.file_list = self.file_list[train_len:] #测试集
def __len__(self): #定义读取数据集长度的函数
return len(self.file_list)
def __getitem__(self, item):
# 生成输入图片和掩模的文件路径
img_file = os.path.join(self.img_dir, self.file_list[item])
mask_file = os.path.join(self.mask_dir, self.file_list[item].replace("tif", "gif"))
# img 和 mask 采用pillow读取,然后采用双线性插值(Bilinear)缩放成需要的尺寸
img = np.array(Image.open(img_file).resize(self.img_size, Image.BILINEAR))
mask = np.array(Image.open(mask_file).resize(self.img_size, Image.BILINEAR))
# 如果读取的掩模是单通道图片,增加一个维度变成形如(224,224,1)
if len(mask.shape)==2:
mask = np.expand_dims(mask, axis=2)
# HWC to CHW 图像通道的变化,契合网络的输入要求
img = img.transpose((2, 0, 1))
mask = mask.transpose((2, 0, 1))
mask = mask / 255.0
# 转换数据类型
img = img.astype(np.float32)
mask = mask.astype(np.float32)
return torch.from_numpy(img), torch.from_numpy(mask)
if __name__ == "__main__":
loader = DRIVE_Loader("drive/MyDrive/data/training/images", "drive/MyDrive/data/training/1st_manual", (224, 224))
img, mask = loader.__getitem__(1)
# 可视化展示
plt.subplot(121)
img = np.transpose(img.squeeze(), (1, 2, 0))
# 必须除以256,不然报错
img = img / 255
print(img.shape)
plt.imshow(img)
plt.subplot(122)
plt.imshow(mask.squeeze(), cmap="gray") #规定为灰度图
plt.show()
class DiceLoss(nn.Module):#Dice系数差异函数(语义分割的损失函数)
def __init__(self):
super(DiceLoss, self).__init__()
@staticmethod
def forward(output, target, smooth=1e-5): #参数:预测值、真实标签、误差
y_pd = output.view(-1)
y_gt = target.view(-1)
intersection = torch.sum(y_pd * y_gt)
score = (2. * intersection + smooth) / (torch.sum(y_pd) + torch.sum(y_gt) + smooth)
loss = 1 - score
return loss
class DiceBCELoss(nn.Module):#损失函数:BCE Loss和 Dice Loss进行组合
def __init__(self, weight_ce=0.6):#参数:类实例对象self、权重系数
super(DiceBCELoss, self).__init__()
self.weight_ce = weight_ce
self.ce = nn.BCELoss()
self.dc = DiceLoss()
def forward(self, net_output, target):#参数:类实例对象self、预测值、真实标签
dc_loss = self.dc(net_output, target)
ce_loss = self.ce(net_output, target)
result = self.weight_ce * ce_loss + (1 - self.weight_ce) * dc_loss
return result
def rename(img_dir, mask_dir):#命名文件或目录
img_list = os.listdir(img_dir)
mask_list = os.listdir(mask_dir)
for f in img_list:#对图像文件命名
os.rename(os.path.join(img_dir,f), os.path.join(img_dir,f.replace("_training.tif", ".tif")))
#参数:os.rename(src, dst),src -- 要修改的目录名,dst -- 修改后的目录名
for f in mask_list:#对掩模文件命名
os.rename(os.path.join(mask_dir,f), os.path.join(mask_dir,f.replace("_manual1.gif", ".gif")))
if __name__ == "__main__":
rename("drive/MyDrive/data/training/images", "drive/MyDrive/data/training/1st_manual")
from torch.utils.data import Dataset
from torch.optim import Adam
from torch.utils.data import DataLoader
import os
# 设置可用的显卡编号
#os.environ["CUDA_VISIBLE_DEVICES"] = "1"
def train():
# 训练的epoch数
epoch = 100
# 数据文件夹
img_dir = "drive/MyDrive/data/training/images"
# 掩模文件夹
mask_dir = "drive/MyDrive/data/training/1st_manual"
# 网络输入图片大小
img_size = (512, 512)
# 创建训练loader和验证loader
tr_loader = DataLoader(DRIVE_Loader(img_dir, mask_dir, img_size, 'train'), batch_size=4, shuffle=True,num_workers=2, pin_memory= True, drop_last=True)
val_loader = DataLoader(DRIVE_Loader(img_dir, mask_dir, img_size, 'val'), batch_size=4, shuffle=True,num_workers=2, pin_memory= True, drop_last=True)
# 定义损失函数
criterion = DiceBCELoss()
# 把网络加载到显卡
network = UNet().cuda()
print("加载成功")
# 定义优化器
optimizer = Adam(network.parameters(), weight_decay=0.0001)
best_score = 1.0
for i in range(epoch):
# 设置为训练模式,会更新BN和Dropout参数
network.train()
train_step = 0
train_loss = 0
val_loss = 0
val_step = 0
for batch in tr_loader:
# 读取每个batch的数据和掩模
imgs, mask = batch
# 把数据加载到显卡
imgs = imgs.cuda()
mask = mask.cuda()
# 把数据喂入网络,获得一个预测结果
mask_pred = network(imgs)
# 根据预测结果与掩模求出Loss
loss = criterion(mask_pred, mask)
# 统计训练loss
train_loss += loss.item()
train_step +=1
# 梯度清零
optimizer.zero_grad()
# 通过loss求出梯度
loss.backward()
# 使用Adam进行梯度回传
optimizer.step()
# 设置为验证模式,不更新BN和Dropout参数
network.eval()
# 验证
with torch.no_grad():
for batch in val_loader:
imgs, mask = batch
imgs = imgs.cuda()
mask = mask.cuda()
# 求出评价指标,这里用的是dice
val_loss += DiceLoss()(network(imgs), mask).item()
val_step += 1
# 分别求出整个epoch的训练loss以及验证指标
train_loss /= train_step
val_loss /= val_step
# 如果验证指标比最优值更好,那么保存当前模型参数
if val_loss < best_score:
best_score = val_loss
torch.save(network.state_dict(), "checkpoint.pth")
# 输出
print(str(i), "train_loss:", train_loss, "val_dice", val_loss)
if __name__=="__main__":
train()
import torch
from PIL import Image
import numpy as np
import os
from UNet import UNet
# 设置使用的显卡编号
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
def test(test_dir, weight_path, outputs_dir, img_size=(512, 512)):
"""
:param test_dir:需要预测的数据的文件夹
:param weight_path: 权重文件路径
:param outputs_dir: 输出文件夹
:param img_size: 图片大小
:return:
"""
# 定义网络结构,并且加载到显卡
network = UNet().cuda()
# 加载权重文件(训练好的网络)
network.load_state_dict(torch.load(weight_path))
# 获取测试文件夹的文件
file_list = os.listdir(test_dir)
for f in file_list:
# 读取图片并完成缩放
img = np.array(Image.open(os.path.join(test_dir, f)).resize(img_size, Image.BILINEAR))
# 增加batch维度
img = np.expand_dims(img, axis=0)
# 更改通道顺序(BHWC->BCHW)
img = img.transpose((0, 3, 1, 2))
# 转为浮点类型
img = img.astype(np.float32)
# 预测结果并且从显存转移到内存中
pred = network(torch.from_numpy(img).cuda()).clone().cpu().detach().numpy()
print(type(pred))
print(pred.shape)
# 二值化操作
pred[pred >= 0.5] = 1
pred[pred < 0.5] = 0
# 保存结果到输出文件夹
Image.fromarray(pred[0, 0, :, :]).save(os.path.join(outputs_dir, f))
if __name__ == "__main__":
test("drive/MyDrive/data/test/images", "./checkpoint.pth", "drive/MyDrive/data/test/outputs")
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)