<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">#!/usr/bin/env python
# coding: utf-8

import os 
import numpy as np
from keras.layers import Input, merge, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Dropout, Lambda, Cropping2D, BatchNormalization
from keras.optimizers import *
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
#from keras import backend as keras
from keras import backend as K
# from data import *
# from keras.utils import plot_model
# import matplotlib.pyplot as plt
import time
from keras.preprocessing.image import image
from keras.models import *
#from keras.layers import *

from keras.optimizers import adam_v2

smooth = 1e-3 #avoid to divide by 0 in dice_coef function
npy_path = "data/npydata"
if not os.path.isdir(npy_path):		os.mkdir(npy_path)

def dice_coef(y_true, y_pred):
	y_true_f = K.flatten(y_true)
	y_pred_f = K.flatten(y_pred)
	intersection = K.sum(y_true_f * y_pred_f)
	return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def dice_coef_loss(y_true, y_pred):
	return 1 - dice_coef(y_true, y_pred)


def load_validation_data():
	print('load validation images...')
	imgs_validation = np.load(os.path.join(npy_path,"validation_volumes.npy"))
	imgs_mask_validation = np.load(os.path.join(npy_path, "validation_labels.npy"))
	imgs_validation = imgs_validation.astype('float32')
	imgs_mask_validation = imgs_mask_validation.astype('float32')
	imgs_validation /= 255
	# imgs_mask_validation /= 255
	# imgs_mask_validation[imgs_mask_validation &gt; 0.5] = 1
	# imgs_mask_validation[imgs_mask_validation &lt;= 0.5] = 0
	return imgs_validation,imgs_mask_validation

def load_train_data():
	print('load train images...')
	imgs_train = np.load(os.path.join(npy_path, "train_volumes.npy"))
	imgs_mask_train = np.load(os.path.join(npy_path, "train_labels.npy"))
	imgs_train = imgs_train.astype('float32')
	imgs_mask_train = imgs_mask_train.astype('float32')
	imgs_train /= 255
	# imgs_mask_train /= 255
	# imgs_mask_train[imgs_mask_train &gt; 0.5] = 1
	# imgs_mask_train[imgs_mask_train &lt;= 0.5] = 0
	return imgs_train,imgs_mask_train

def load_test_data():
	print('-'*30)
	print('load test images...')
	print('-'*30)
	imgs_test = np.load(os.path.join(npy_path, "test_volumes.npy"))
	imgs_test = imgs_test.astype('float32')
	imgs_test /= 255
	return imgs_test

class myUnet(object):

	def __init__(self, img_rows = 128, img_cols = 128):
		self.img_rows = img_rows
		self.img_cols = img_cols

	def load_data(self):
		#mydata = dataProcess(self.img_rows, self.img_cols)
		imgs_train, imgs_mask_train = load_train_data()
		imgs_validation, imgs_mask_validation = load_validation_data()
		imgs_test = load_test_data()
		return imgs_train, imgs_mask_train, imgs_validation, imgs_mask_validation, imgs_test

	def get_unet(self):
		inputs = Input((self.img_rows, self.img_cols,1))
		conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(inputs)
		print ("conv1 shape:",conv1.shape)
		conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv1)
		print ("conv1 shape:",conv1.shape)
		#norm1 = BatchNormalization(axis=-1)
		pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
		print ("pool1 shape:",pool1.shape)
		#
		conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool1)
		print ("conv2 shape:",conv2.shape)
		conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv2)
		print ("conv2 shape:",conv2.shape)
		#norm2 = BatchNormalization(axis=-1)
		pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
		print ("pool2 shape:",pool2.shape)
		#
		conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool2)
		print ("conv3 shape:",conv3.shape)
		conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv3)
		print ("conv3 shape:",conv3.shape)
		#norm3 = BatchNormalization(axis=-1)
		pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
		print ("pool3 shape:",pool3.shape)
		#
		conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool3)
		conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv4)
		drop4 = Dropout(0.5)(conv4)
		pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)
		#
		conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool4)
		conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv5)
		drop5 = Dropout(0.5)(conv5)
		#
		# change elu to relu for all
		up6 = Conv2D(512, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(drop5))
		# merge6 = merge([drop4,up6], mode = 'concat', concat_axis = 3)
		merge6 = concatenate([drop4,up6], axis = 3)
		conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge6)
		conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv6)
		#
		up7 = Conv2D(256, 2, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv6))
		# merge7 = merge([conv3,up7], mode = 'concat', concat_axis = 3)
		merge7 = concatenate([conv3,up7],  axis = 3)
		conv7 = Conv2D(256, 3, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(merge7)
		conv7 = Conv2D(256, 3, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(conv7)
		#
		up8 = Conv2D(128, 2, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv7))
		# merge8 = merge([conv2,up8], mode = 'concat', concat_axis = 3)
		merge8 = concatenate([conv2,up8], axis = 3)
		conv8 = Conv2D(128, 3, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(merge8)
		conv8 = Conv2D(128, 3, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(conv8)
		#
		up9 = Conv2D(64, 2, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv8))
		# merge9 = merge([conv1,up9], mode = 'concat', concat_axis = 3)
		merge9 = concatenate([conv1,up9], axis = 3)
		conv9 = Conv2D(64, 3, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(merge9)
		conv9 = Conv2D(64, 3, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(conv9)
		conv9 = Conv2D(2, 3, activation = 'elu', padding = 'same', kernel_initializer = 'he_normal')(conv9)
		#
		conv10 = Conv2D(1, 1, activation = 'sigmoid')(conv9)
		model = Model(inputs = inputs, outputs = conv10)
		# model.compile(optimizer = Adam(lr = 1e-4), loss = dice_coef_loss, metrics = [dice_coef])
		# print ('Adam')
		model.compile(optimizer = adam_v2.Adam(lr = 1e-4), loss = 'binary_crossentropy', metrics = ['accuracy'])
		return model

	def train(self):
		print("loading data")
		imgs_train, imgs_mask_train, imgs_validation, imgs_mask_validation, imgs_test = self.load_data()
		print("loading data done")
		model = self.get_unet()
		#print(model.summary())
		print("got unet")
		# checkpoint
		filepath="weights-improvement-{epoch:02d}-{val_loss:.2f}.hdf5"
		model_checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
		callbacks_list = [model_checkpoint]
		print('Fitting model...')
		#history = model.fit(imgs_train, imgs_mask_train, batch_size=4, nb_epoch=130, verbose=1,validation_data=(imgs_validation, imgs_mask_validation), shuffle=True, callbacks=callbacks_list)
		#rint('Fitting model...')
		history = model.fit(imgs_train, imgs_mask_train, batch_size=1, epochs=5, verbose=1,validation_data=(imgs_validation, imgs_mask_validation), shuffle=True, callbacks=callbacks_list)
		# history = model.fit(imgs_train, imgs_mask_train, batch_size=1, nb_epoch=3, verbose=1,validation_split=0.2, shuffle=True, callbacks=callbacks_list)
		print(history.history.keys())
		# summarize history for accuracy
		# plt.plot(history.history['dice_coef'])
		# plt.plot(history.history['val_dice_coef'])
		# plt.title('model accuracy')
		# plt.ylabel('Dice')
		# plt.xlabel('epoch')
		# plt.legend(['train', 'validation'], loc='upper left')
		# plt.show()
		
		# # summarize history for loss
		# plt.plot(history.history['loss'])
		# plt.plot(history.history['val_loss'])
		# plt.title('model loss')
		# plt.ylabel('loss')
		# plt.xlabel('epoch')
		# plt.legend(['train', 'validation'], loc='upper left')
		# plt.show()
		
		# model.load_weights('weights-improvement-10-0.96.hdf5')
		print('predict test data')
		# imgs_mask_test = model.predict(imgs_test, batch_size=1, verbose=1)
		imgs_mask_test = model.predict(imgs_test, verbose=1)
		np.save('results/imgs_mask_test.npy', imgs_mask_test)

	def save_img(self):
		print("array to image")
		imgs = np.load('results/imgs_mask_test.npy')
		for i in range(imgs.shape[0]):
			img = imgs[i]
			img = image.array_to_img(img)
			#img.save("results/%d.jpg"%(i))
			img.save("results/%d.png"%(i))


mymodel = myUnet()
m = mymodel.get_unet()
m.summary()


if __name__ == '__main__':
	start_time = time.time()
	# model_dice = dice_loss(smooth=1e-5, thresh=0.9)
	myunet = myUnet()
	myunet.train()
	myunet.save_img()
	print("Time to process --- {:.3f} seconds ---".format(time.time() - start_time))
</pre></body></html>