Recognizing Seam Errors and Breaks with a residual neural network

Introduction

Once a year we offer a class called Forschungsprojekt Industrie 4.0 (Research Project 4.0), which is a practical assignment to enrolled of students. This time the goal of the assignment was to create a quality control system which recognizes errors and breaks on textile seams. Six students enrolled to this specific assignment.

In Figure 1 you see on the left upper side a sewing machine from top. We reconstructed a lamp holder to a camera holder by attaching a usb-camera (1 on Figure 1) on it. Then we moved the usb-camera right next to the sewing machine (2 on Figure 1) to have a camera view of the sewed textile.

Figure 1: Setup

While the sewing maching is running, the usb-camera takes images from the textile with its sewed string. The image is sent to a quality control system, which should recognize errors and breaks. So the assignment for the students here is to create a quality control system.

Preparations

We decided to use a neural network to recognize breaks and errors from the images made by the usb-camera. For training the neural network the students needed to gather numerous training images.

For this purpose the students cut textile stripes which can bee seen in Figure 2. Then the students sewed seams along the textile stripes, which were recorded by the usb-camera system at the same time. The videos were saved as mp4 files.

Only in rare cases sewing machines produce errors or breaks. This is why we had to generate the errors and breaks ourselves.

Figure 2: Stripes

In Figure 3 you can see how we create errors and breaks. The left picture shows how an error is produced by sticking a scissor under the seam which widening the string. The right picture shows how we produce a break by cutting the string with the scissor. Therefore we have already two categories to distinguish: “errors” and “breaks”. We call them from now on attributes. There are two more attributes to come.

Figure 3: Prepare errors and breaks

On sewing machines you can set up the distances for the stitches. Possible values are e.g. 2mm or 4mm. We decided to use exactly these values to distinguish and defined for this the attribute “length”. If the attribute “length” is true, than we have a stitch distance of 2mm, if false we have a stitch distance of 4mm.

The last attribute we call “good”. This simply means that we recognize the image on the seam as a good seam. If attribute “good” is set to false, there is a problem with an “error”, “break” or “length”. This makes it altogether four attributes: “good”, “error”, “break” and “length”.

Figure 4: Images of categories “good”, “error”, “break” and “length”

In Figure 4 you find images for each attribute. On the left you find a “good” image. The second left picture you see an “error”. On the second right picture you find a “break”. The right most picture shows a length distance of 4mm, which is an attribute with value false.

You might have noticed that the picture on the right and the picture on the second right have the same stitch distance. So the second left picture has a “break”, and a stitch distance of 4mm. This means the classifications are not exclusive. An image from the usb-camera can show all attributes as true, all attributes as false or any other combination.

Creating labeled data

In Figure 1 you can see the setup of the sewing machine and the attached usb-camera. We used this setup to create videos while sewing strings on the textile stripes (Figure 2). Since error’s and break’s do not happen very often, we prepared the textile stripes with the scissor, and created new videos from them. Our goal was to have balanced data, which means that there is a good proportion of error’s and break’s in our data set. We are not describing the code to create video’s here. You could actually use any cell phone for this task.

However we want to show how we have labeled the training images resulting from the videos. The code below sets the basepath and assigns the video file path to the variable video. We will also use a file called data.csv to store the values (true or false) for each attribute on every image. It is basically a lookup table. The filename is assigned to datacsv.

basepath = r"..."
videos = "videos"
videopath = os.path.join(basepath,videos)

videonamepre = "video_2021_05_05_09_34_57__2mm_Kombi"
videonameext = "mp4"
datacsv = videonamepre + "_data.csv"
videoname = videonamepre + "." + videonameext
video = os.path.join(videopath,videoname)

The list picnames contains the names of the attributes, see code below. The list picstats contain the true or false values of the attributes for one image. The variable picpath is the path of the directory where we store our images extracted from the videos.

picnames = ["good","error","break","length"]
picstats = [False, False, False, False]

picspath = os.path.join(basepath, "pics")

The function save_entry is shown below. It uses the python library pandas to append the classification information (parameters name, picnames and picstats) to a data.csv file. First it creates a data frame, then it reads in an already existing data.csv file and loads in its content into the list datalist. Finally it moves the name, picnames and picstats values into the dictionary dataitem and appends it to datalist. The function save_entry converts datalist into a pandas data frame and saves the content into the data.csv file.

def save_entry(name, picnames, picstats):

    arr = os.listdir(picspath)
      
    df = pd.DataFrame()
    datalist = []
    
        
    if os.path.isfile(os.path.join(os.path.join(picspath,datacsv))):

        df=pd.read_csv(os.path.join(os.path.join(picspath,datacsv)))
        for index, row in df.iterrows():
            datalist.append({'name': row['name'], picnames[0]: row[picnames[0]], picnames[1]: row[picnames[1]], picnames[2]: row[picnames[2]], picnames[3]: row[picnames[3]]})
            
    dataitem ={'name': name, picnames[0]: picstats[0], picnames[1]: picstats[1], picnames[2]: picstats[2], picnames[3]: picstats[3]}
    datalist.append(dataitem)
    df = pd.DataFrame(datalist) 
    df.to_csv(os.path.join(picspath, datacsv))
    
    return

The code below is a small application to label the images from the previously created videos. First it opens a video named video with OpenCV’s constructor VideoCapture. The code runs into a loop to process each single image of the video. The OpenCV’s function waitKey stops the code execution until a key is pressed. In case the user presses the key “n”, the code reads in the next image of the video. It then puts text with classification information onto the image and displays it. Basically it displays the keys g for “good”, e for “error”, b for “break”, l for “length” and their attribute values. The user can press the keys g, e, b, l to toggle the attribute values. If the user presses the s key, then the application saves the attribute values with the function save_entry into the data.csv file.

count = 0
cap = cv2.VideoCapture(video)

if cap.isOpened():
    ret, frame = cap.read() 

while(cap.isOpened()):

    if ret == True:
        framedis = frame.copy()
        framedis = cv2.putText(framedis, str(count), (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255,255,255), 2, cv2.LINE_AA)
        framedis = cv2.putText(framedis, str(count), (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,0), 1, cv2.LINE_AA)
        framedis = cv2.putText(framedis, picnames[0] + " (g) " + str(picstats[0]), (35, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255,255,255), 3, cv2.LINE_AA)
        framedis = cv2.putText(framedis, picnames[0] + " (g) " + str(picstats[0]), (35, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,0), 1, cv2.LINE_AA)
        framedis = cv2.putText(framedis, picnames[1] + " (e) " + str(picstats[1]), (35, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255,255,255), 3, cv2.LINE_AA)
        framedis = cv2.putText(framedis, picnames[1] + " (e) " + str(picstats[1]), (35, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,0), 1, cv2.LINE_AA)
        framedis = cv2.putText(framedis, picnames[2] + " (b) " + str(picstats[2]), (35, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255,255,255), 3, cv2.LINE_AA)
        framedis = cv2.putText(framedis, picnames[2] + " (b) " + str(picstats[2]), (35, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,0), 1, cv2.LINE_AA)
        framedis = cv2.putText(framedis, picnames[3] + " (l) " + str(picstats[3]), (35, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255,255,255), 3, cv2.LINE_AA)
        framedis = cv2.putText(framedis, picnames[3] + " (l) " + str(picstats[3]), (35, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,0), 1, cv2.LINE_AA)
        
        cv2.imshow('frame',framedis)
    else:
        break
        
    key = cv2.waitKey(0) & 0xFF
    if key == ord('q'):
        break
    if key == ord('n'):
        ret, frame = cap.read()
        count += 1
    if key == ord('g'):
        if picstats[0] == True:
            picstats[0] = False
        else:
            picstats[0] = True
    if key == ord('e'):
        if picstats[1] == True:
            picstats[1] = False
        else:
            picstats[1] = True
    if key == ord('b'):
        if picstats[2] == True:
            picstats[2] = False
        else:
            picstats[2] = True
    if key == ord('l'):
        if picstats[3] == True:
            picstats[3] = False
        else:
            picstats[3] = True
            
            
    if key == ord('s'):
        save_entry(videonamepre+f"_{count}.png", picnames, picstats)
        cv2.imwrite(os.path.join(picspath, videonamepre+f"_{count}.png"),frame)       
            
cap.release()
cv2.destroyAllWindows()

In Figure 5 you can see how the application displays the current image of a video. The user can change the attribute values by pressing one of the described keys to save each image into the data.csv file.

Figure 5: Labeling

The students working on this project created around 15000 images and saved their attribute values with this application into csv files.

Preprocessing the data

For training we need to have both, training data and validation data, which we want to separate. We do this by creating two files with lookup tables, a train.csv file and a valid.csv file. Both files have the image file name, the path location and its attribute values. Also we take a small number of images and assign them to a test data file which we call test.csv.

The function createcsv below is creating a new csv file with a csv file name as parameter (datacsv) from a list piclist. The list piclist contains a list of image filenames and its attribute values.

def createcsv(datacsv, targetpath, piclist):

    datalist = []
    
    df = pd.DataFrame()
    
    for item in piclist:
        datalist.append({'name': item[0], classes[0]: item[1][classes[0]], classes[1]: item[1][classes[1]], classes[2]: item[1][classes[2]], classes[3]: item[1][classes[3]]})

    df = pd.DataFrame(datalist) 
    df.to_csv(os.path.join(targetpath, datacsv))

The code below opens all csv files one by one located inside the fullpathpic directory . It reads each files content and moves it into a pandas dataframe df. The code iterates through the content and moves each entry into the list picnamelist. So each entry of picnamelist contains a full path and filename of the image and its attribute values.

The list picnamelist is then shuffled with random‘s shuffle function and 20 percent of picnamelist elements are moved into validlist. The code moves another two percent of picnamelist elements into testlist, and finally the code moves the remaining content into trainlist (around 78 percent). The code subsequently calls the function createcsv with validlist, testlist and trainlist as parameters to store lookup tables into valid.csv, test.csv and train.csv.

picnamelist = []
attributelist = []
validlist = []
testlist = []
trainlist = []

for file in os.listdir(fullpathpic ):
    if file.endswith(".csv"):
        f = open(os.path.join(os.path.join(fullpathpic,file)), "r")
        df = pd.read_csv(f, index_col = 0) 
        
        for index, row in df.iterrows():
            picname = row["name"]
            
            if os.path.isfile(os.path.join(fullpathpic, picname)):
                picnamelist.append([os.path.join(fullpathpic, picname), row])


random.shuffle(picnamelist)

num = 20*len(picnamelist) // 100
validlist = picnamelist[:num]
testlist = picnamelist[num:num+num//10]
trainlist =  picnamelist[num+num//10:]

createcsv("valid.csv", basepath, validlist)
createcsv("test.csv", basepath, testlist)
createcsv("train.csv", basepath, trainlist)

Training

The code below defines the basepath, and the location of the lookup tables for the training, validation and testing data (traincsv, validcsv and testcsv). The code stores the model into the path model_path. The model filename is modelsaved. The list classes contains the attributes, like picnames above (note that the code below and above are different code files).

The pandas read_csv function returns the length of the training data and validation data and assigns them to lentrain and lenvalid.

basepath = r"..."
traincsv = os.path.join(basepath, "train.csv")
validcsv = os.path.join(basepath, 'valid.csv')
testcsv = os.path.join(basepath, 'test.csv')
model_path = os.path.join(basepath, 'models')

classes = ["good","error","break","length"]

now = datetime.datetime.now()

modelsaved = "model.h5"

lentrain = pd.read_csv(traincsv, index_col = 0).shape[0]
lenvalid = pd.read_csv(validcsv, index_col = 0).shape[0]

We have described the function generatebatchdata previously here, so we dont go too much into details. As parameter we use the previously created lookup tables and open them with pandas read_csv function. The code moves the filenames into the list filenames and its attribute values into the list classnumbers. During training the number of elements (batchsize elements) are taken from both lists filenames and classnumbers, then the images are opened with OpenCV’s function imread and returned to the training process. Around 70 percent of the images are augmented by its brightness and contrast with OpenCV’s convertScaleAbs function.

def generatebatchdata(batchsize, datacsv, classes):

    filenames = []
    classnumbers = []
    
    df = pd.read_csv(datacsv, index_col = 0) 
        
    for index, row in df.iterrows():
        filenames.append(row["name"])
        classnumbers.append([int(row[classes[0]]), int(row[classes[1]]), int(row[classes[2]]), int(row[classes[3]])])
           
    while True:
        batchstart = 0
        batchend = batchsize    
        
        while batchstart < len(filenames):
            
            imagelist = []
            classlist = []
            
            limit = min(batchend, len(filenames))

            for i in range(batchstart, limit):
                img = np.zeros((dim[0], dim[1],3), 'uint8')
                img = cv2.resize(cv2.imread(filenames[i],cv2.IMREAD_COLOR ), dim, interpolation = cv2.INTER_AREA)
                if random.random() > 0.3:
                    alpha = 0.8 + 0.4*random.random()
                    beta = int(random.random()*15)
                    img = cv2.convertScaleAbs(img, alpha=alpha, beta=beta)

                imagelist.append(img)
                classlist.append(classnumbers[i])


            train_data = np.array(imagelist, dtype=np.float32)
            train_data -= train_data.mean()
            train_data /= train_data.std()
            train_class= np.array(classlist, dtype=np.float32)

            yield (train_data,train_class)    

            batchstart += batchsize   
            batchend += batchsize

We instantiate two functions from generatebatchdata: generator_train and generator_valid. We have set the batchsizes for training to 20, and for validation to one.

batchsizetrain = 20
batchsizevalid = 1

generator_train = generatebatchdata(batchsizetrain, traincsv , classes)
generator_valid = generatebatchdata(batchsizevalid, validcsv, classes)

The functions relu_bn, residual_block, and create_res_net in the code below create a residual neural network (RNN). Dorian Lazar supplied the code on github and it can be found here. We are not going to much into details, but the create_res_net is implementing a RNN from elements as shown as in Figure 6. You see here one residual block element in case the downsample parameter was set to true. The function create_res_net is appending several such blocks into one complete RNN.

Figure 6: Residual Block

We slightly modified the code at the end of the function create_res_net. As an output we have a dense layer with four neurons, which is the length of the list classes (also of the number of attributes we use). The last activation function we use the sigmoid function. Using the sigmoid function is common practice for mulit-label classfication problems in combination with the binary_crossentropy loss function.

def relu_bn(inputs: Tensor) -> Tensor:
    relu = ReLU()(inputs)
    bn = BatchNormalization()(relu)
    return bn

def residual_block(x: Tensor, downsample: bool, filters: int, kernel_size: int = 3) -> Tensor:
    y = Conv2D(kernel_size=kernel_size,
               strides= (1 if not downsample else 2),
               filters=filters,
               padding="same")(x)
    y = relu_bn(y)
    y = Conv2D(kernel_size=kernel_size,
               strides=1,
               filters=filters,
               padding="same")(y)

    if downsample:
        x = Conv2D(kernel_size=1,
                   strides=2,
                   filters=filters,
                   padding="same")(x)

    out = Add()([x, y])
    out = relu_bn(out)
    return out

def create_res_net():
    
    inputs = Input(shape=(dim[0], dim[1], 3))
    num_filters = 64
    
    t = BatchNormalization()(inputs)
    t = Conv2D(kernel_size=3,
               strides=1,
               filters=num_filters,
               padding="same")(t)
    t = relu_bn(t)
    
    num_blocks_list = [2, 4, 2]
    for i in range(len(num_blocks_list)):
        num_blocks = num_blocks_list[i]
        for j in range(num_blocks):
            t = residual_block(t, downsample=(j==0 and i!=0), filters=num_filters)
        num_filters *= 2
    
    t = AveragePooling2D(4)(t)
    t = Flatten()(t)
    outputs = Dense(len(classes), activation='sigmoid')(t)
    
    model = Model(inputs, outputs)

    return model

The code below creates a model with the function create_res_net and assigns it to the variable model. The variable model is compiled with the binary_crossentropy loss function. The summary method shows the structure of the model.

model = create_res_net()  
model.compile(Adam(lr=.00001), loss="binary_crossentropy", metrics=['accuracy'])
model.summary()

For training and validation you need to specify the number of steps: steptrainimages and stepsvalidimages. We can calculate them by dividing the number of training images with the batchsize for training and the number of validation images with the batchsize for validation.

stepstrainimages = lentrain//batchsizetrain
stepsvalidimages = lenvalid//batchsizevalid

Below, the code trains the model with its fit method. Parameters are the generators generator_train and generator_valid. Also the number of steps (stepstrainimages and stepsvalidimages) need to be given. After execution, the fit function returns its history content into variables. They can be used to create a checkpoint modelsaved with details on loss, valid_loss, accuracy and val_accuracy. By looking at the model’s checkpoint name, we can see an indication of of quality of the training. The code saves the checkpoint with its save_weights method.

hist = model.fit(generator_train,steps_per_epoch=stepstrainimages, epochs=10, validation_data=generator_valid, validation_steps=stepsvalidimages)

tl=hist.history['loss'][-1]
vl=hist.history['val_loss'][-1]
ta=hist.history['accuracy'][-1]
va=hist.history['val_accuracy'][-1]

modelsaved = f"model_{net}_{tl:.2f}_{vl:.2f}_{ta:1.3f}_{va:1.3f}.h5"
model.save_weights(os.path.join(model_path,modelsaved))

After training we compared the training losses with the validation losses. We find, that the training losses are smaller compared to the validation losses which indicates an overfitting. In the result section below we show the prediction results with testing data.

Result

We created not only a lookup table for training data and validation data, but also for testing data in the file test.csv. We set aside for this around 300 images. Then we ran the 300 images through the prediction method of model and compared the results with the attribute values in the lookup table. We are not showing here the code. Below you find the percentages of correct answers for the attributes “good”, “error”, “break” and “length”. We see that the “length” exceeds 100 percent correctness (all images were predicted concerning attribute “length” correctly). While “error” and “break” have a prediction accuracy of around 97 percent. Only the “good” category has less accuracy. We explain this, because student’s decision for “good” during labeling could be very error prone. Six students can label the attribute “good” very subjectively. While the decision to label an attribute “error”, “break” and “length” is much clearer to make.

Good Accuracy: 82.70270270270271 
Error Accuracy: 97.2972972972973 
Break Accuracy: 97.02702702702702 
Length Accuracy: 100.

To do further validation on the test images, we created heatmaps. A heatmap can point out the pixels, which leads to the neural network’s classification decision. We have described the code for creating heatmaps here, so we will not show it in this post anymore.

Figure 7 shows four heatsmaps. The left most picture shows correctly an error in blue. This means that the model predicts correctly the pixels around the seam error which led to the right classification decision. The same is true for the second left picture. We have here a break, and the heatmap showing correctly the pixels of the break in green. Since we have multi-label classification, there are cases where we have an error and a break at the same time. The second right picture is showing this case. Also here the decision was made correctly with pixels around the breaks in green and around the error in blue. The most right picture shows the pixels in red, leading to the length decision.

Figure 7: Heatmaps

One last remark. The generatebatchdata function we described above did some data augmentation by changing the brightness and the contrast of 70 percent of all images. Actually we did even more data augmentation, which was not shown here. The 15000 images were doubled to 30000 images by modifying them in the following way. We took a portion of the upper image out and appended this portion to the bottom part of the image. You find in Figure 8 an illustration how we did this. These are simple OpenCV functions, so we leave out the code here.

Figure 8: Data Augmentation

Acknowledgement

Special thanks to the class of Summer Semester 2021 Forschungsprojekt Industrie 4.0 providing 15000 images for the training data used for the neural network training. We appreciate this very much, and we know how much effort you have put into this.

Also special thanks to the University of Applied Science Albstadt-Sigmaringen for hosting the learn-factory and providing the appliances to enable this research.

Gradient-weighted Class Activation Mapping with fruit images

In a previous blog we documented methods and code for recognizing fruits with a neural network on a raspberry pi. This time we want to go one step further and describe the method and code to create heatmaps for fruit images. The method we use here is called Gradient-weighted Class Activation Mapping (Grad-CAM). A heatmap is telling us which pixel of an fruit image leads to the neural network’s decision to assign the input image to a specific class.

We believe that heatmaps are a very useful information especially during the validation after a neural network is trained. We want to know, if the neural networks is really making the right decision upon the given information (such as a fruit image). A good example is the classification of wolf and husky dog images made once by researchers (“Why Should I Trust You?”, See Reference). The researchers had actually pretty good results, until somebody figured out, that most wolf images were taken in a snowy environment, while the husky images were not. The neural network mostly associated a snowy environment with a wolf. The husky dog was therefore classified as wolf, when the image was taken with snow in the background.

For a deeper neural network validation, we can use heatmaps to see how the classification decision was made. Below we will show you how we generate heatmaps from fruit images. The Keras website helped us a lot to write our code, see also Reference.

The Setup

We use three different classes of images: Apfel (appel) images, orange images and tomate (tomato) images, see Figure 1. The list classes, see code below, contains strings describing the classes. We filled in the training, validation and test directories with fruit images, similar to those in Figure 1. Each class of fruit images went into its own directory named Apfel, Orange and Tomate.

Figure 1: Images of an appel, an orange and a tomato

In the code below we use the paths traindir, a validdir and a testdir. The weights and the structure of the neural network model (modelsaved and modeljson) is saved into the model directory .

The classes Apfel, Orange and Tomate are associated to numbers with the python classnum dictionary. The variable dim defines the size of the images.

basepath = "/home/....../Session1"
traindir = os.path.join(basepath, "pics" , "train")
validdir = os.path.join(basepath, "pics" , 'valid')
testdir = os.path.join(basepath, "pics" , 'test')
model_path = os.path.join(basepath, 'models')

classes = ["Apfel","Orange","Tomate"]
classnum = {"Apfel":0, "Orange":1, "Tomate":2}

net = 'convnet'

now = datetime.datetime.now()

modelweightname = f"model_{net}_{now.year}-{now.month}-{now.day}_callback.h5"
modelsaved = f"model_{net}.h5"
modeljson = f"model_{net}.json"


dim = (100,100, 3)

The Model

The following code shows the convolutional neural network (CNN) code in Keras. We have four convolutional layers and two dense layers. The last dense layer has three neurons. The output of one neuron indicates if an image is predicted as an apple, an orange or a tomato. Since the output is exclusive (an apple cannot be a tomato), we use the softmax activation function for the last layer.

def create_conv_net():
    model = Sequential()
    model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same', input_shape=(dim[0], dim[1], 3)))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Flatten())
    model.add(Dense(128, activation='relu', kernel_initializer='he_uniform'))
    model.add(Dense(len(classes), activation='softmax'))
    #opt = Adam(lr=1e-3, decay=1e-3 / 200)
    return model

The function create_conv_net creates the model. It is compiled with categorical_crossentropy loss function, see code below.

model = create_conv_net()
model.compile(Adam(lr=.00001), loss="categorical_crossentropy", metrics=['accuracy'])

The focus here is to describe the Grad-Cam method, and not the training of the model. Therefore we leave out the specifics on training. More details on training can be found here. The code below loads in (Keras method load_weights) a previously prepared model checkpoint and shows with the summary method the structure of the model. This is a useful information, because we have to look for the last convolutional layer. It is needed later.

model.load_weights(os.path.join(model_path,"model_convnet_2021-7-1_callback.h5"))
model.summary()

Now we have to create a new model with two outputs layers. In Figure 2 you see a simplified representation of the CNN we use. The last layer (color orange) represents the dense layer. The last CNN layer is in color green. Due to the filters of the CNN layer we have numerous channels. The Grad-CAM method requires the outputs of the channels. So we need to create a new model, based on the one above, which outputs both, the images of the results of the channels from the last CNN layer and the classification decision from the dense layer.

Figure 2: Simplified CNN

The code below iterates through the layers of CNN model in reverse order and finds the last batch_normalization layer. We modeled the neural network in a way that each CNN layer is followed by the batch_normalization layer, so we pick the batch_normalization layer as an output. The code produces a new model gradModel with model‘s input; and model‘s last CNN layer (actually last batch_nomalization layer) and last dense layer as outputs.

gradModel = None

for layer in reversed(model.layers):
    if "batch_normalization" in layer.name:
        print(layer.name)
        gradModel = Model(inputs=model.inputs, outputs=[model.get_layer(layer.name).output, model.output])
        break

The Grad-CAM method

The function getGrad below is executing the model gradModel with an image given as a parameter img. It returns the results (conv, predictions) of the last CNN layer (actually batch_normalization) and the last dense layer. We are now interested in the gradients of the images returned from the last CNN layer with respect to the loss of a specific class. The dictionary classnum (defined above) outputs a class number and addresses the loss inside predictions, see command below.

loss = predictions[:, classnum[classname]]

The gradients of the last CNN layer with respect to the loss of a specific class are calculated with tape‘s gradient method. The function getGrad returns the input image, the output of the last CNN layer (conv) and the gradients of the last CNN layer.

def getGrad(img, classname):

    testpics = np.array([img], dtype=np.float32)

    with tf.GradientTape() as tape:

        conv, predictions =  gradModel(tf.cast(testpics, tf.float32))
        
        loss = predictions[:, classnum[classname]]

    grads = tape.gradient(loss, conv)

    return img, conv, grads

The function getHeatMap below creates a heatmap from the output of the image’s last CNN layer and its gradients. Inside getHeadMap, the Tensorflow method reduce_mean takes the mean values (pooled_grads) of grads. The mean value is an indication of how important the output of a channel from the last CNN layer is. The function multiplies its mean values with the corresponding CNN layer outputs (convpic) and sums up the output into heatmap. Since we want to have an image to look at, the function getHeatMap is rectifying, normalizing and resizing the heatmap before it is returning it.

def getHeatMap(conv, grads):
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    convpic = conv[0]
    heatmap = convpic @ pooled_grads[..., tf.newaxis]
    heatmapsq = tf.squeeze(heatmap)
    heatmapnorm = tf.maximum(heatmapsq, 0) / tf.math.reduce_max(heatmapsq)
    heatmapnp = heatmapnorm.numpy()
    heatmapresized = cv2.resize(heatmapnp, (dim[0], dim[1]), interpolation = cv2.INTER_AREA)*255
    heatmapresized = heatmapresized.astype("uint8")
    ret = np.zeros((dim[0], dim[1]), 'uint8')
    ret = heatmapresized.copy()
    return ret

Testing the images

The apple, orange and tomato images are stored in separate directories. So the function testgrads, see code below, loads in the images from the test directory into the pics0, pics1 and pics2 lists. They are then added together into pics list.

The function testgrads normalizes the list of images and predicts its classifications (variable predictions). The function testgrads calls the getGrad function and the getHeatMap function to receive a heatmap for each image. Numpy’s argmax methods outputs a number which indicates if the image is an apple, an orange or a tomato and moves the result into pos. Finally the heatmap, which is a grayscale image, is converted into a color image (apple images are converted in green color, orange images into blue color and tomato images into red color). The function testgrads is then returning a list of colored heatmaps for each image.

def testgrads(picdir):

    pics0 = [os.path.join(picdir, classes[0], f) for f in os.listdir(os.path.join(picdir, classes[0]))]
    pics1 = [os.path.join(picdir, classes[1], f) for f in os.listdir(os.path.join(picdir, classes[1]))]
    pics2 = [os.path.join(picdir, classes[2], f) for f in os.listdir(os.path.join(picdir, classes[2]))]
  
    pics = pics0 + pics1 + pics2

    imagelist = []
    
    for pic in pics:
        img = np.zeros((dim[0], dim[1],3), 'uint8')
        img = cv2.resize(cv2.imread(pic,cv2.IMREAD_COLOR ), (dim[0], dim[1]), interpolation = cv2.INTER_AREA)
        imagelist.append(img)
    
    train_data = np.array(imagelist, dtype=np.float32)
    
    train_data -= train_data.mean()
    train_data /= train_data.std()
    
    predictions = model.predict(train_data)
    
    heatmaps = []
    
    for i in range(len(train_data)):   
        heatmapc = np.zeros((dim[0], dim[1],3), 'uint8')
        pos = np.argmax(predictions[i])
        img, conv, grads = getGrad(train_data[i], classes[pos])
        heatmap =  getHeatMap(conv, grads)
        
        if pos == 0:
            posadj = 1
        elif pos == 1:
            posadj = 0
        elif pos == 2:
            posadj = 2    
        
        heatmapc[:,:,posadj] = heatmap[:,:]    
        
        heatmaps.append(heatmapc)
    
            
    return imagelist, heatmaps

Below the code which calls the function testgrads. The parameter testdir is the path of the test images.

imagelist, heatlist = testgrads(testdir)

Result

In Figure 3 you find three images (apple, orange and tomato) passed through the testgrads function, see top row. In the middle row, you find the outputs of the testgrads function. These are the visualized heatmaps. The bottom row of Figure 3 are images which were merged with OpenCV’s weighted method. So the heatmap pixels indicate, which group of pixels of the original image led to the prediction decision. You see that the black heatmaps pixels indicate that the background pixels do not lead to any decision. The same is true for the fruit stems.

Figure 3: Original and heatmap images

References

Keras: https://keras.io/examples/vision/grad_cam/

pyimagesearch: https://www.pyimagesearch.com/2020/03/09/grad-cam-visualize-class-activation-maps-with-keras-tensorflow-and-deep-learning/

Why should I trust you: https://arxiv.org/pdf/1602.04938.pdf

Fruit Recognition: https://www3.hs-albsig.de/wordpress/point2pointmotion/2020/03/26/fruit-recognition-on-a-raspberry-pi/

Barcode detection with a neural network

In late 2020 we were teaching a class called embedded systems, which includes a semester assignment, as well. Due to the overall situation in 2020 it was difficult to do it at the university, so we decided to issue the semester assignment as a home work. One of topics of the assignments was the detection of barcode with a neural network. It is not too far fetched to issue such an assignment for a embedded systems class, because we have nowadays available embedded systems which are capable of using neural network, such as the jetson nano from NVIDIA.

The idea we had is to have a camera taking a stream of images and showing it on the display as a video. As soon as an object with a barcode shows up in the scene, the pixels in the area of the barcode are highlighted and the content of the barcode is shown above the area.

Many of the topics in this blog were covered already before, so we might refer here to previous blogs e.g. during the explanation of the published code.

We assigned the students the first task, which was to collect images with barcodes. Each student had to photograph 500 images. They collected them from everywhere, such as from products in grocery stores, kitchens, bathrooms etc. Since we had seven students taking this assignment, we had finally got 3500 images with barcodes.

Labeling

The next task of the assignment was to label the images. The program labelme is an application, where a user can label areas of an image by drawing polygons around objects, see Figure 1. The user can save the labeled image to a json file. The json file contains the complete image in a decoded format and the vertices of the polygons.

Figure 1: Labeling with Labelme

The students have done this for each image, so we had 3500 labeled images in json files and we stored them into a directory fullpathdata, see code below. The code defines more global variables which are used throughout this post.

dim = (128, 128) 

path =  r'.../Dokumente/Barcode'
dirjson = "train/json"
dirimages = "train/images"
dirmasks = "train/masks"

dirjsonvalid = "valid/json"
dirimagesvalid = "valid/images"
dirmasksvalid = "valid/masks"

dirjsontest = "test/json"
dirimagestest = "test/images"
dirmaskstest = "test/masks"

dirmodels = "models"
modelname = "model-bar.h5"
modelweightname = "model-bar-check.h5"

dirchecks = "checks"
dirdata = "data/jsons"

fullpathdata = os.path.join(path, dirdata)

fullpathjson = os.path.join(path, dirjson)
fullpathimages = os.path.join(path, dirimages)
fullpathmasks = os.path.join(path, dirmasks)

fullpathjsonvalid = os.path.join(path, dirjsonvalid)
fullpathimagesvalid = os.path.join(path, dirimagesvalid)
fullpathmasksvalid = os.path.join(path, dirmasksvalid)

fullpathjsontest = os.path.join(path, dirjsontest)
fullpathimagestest = os.path.join(path, dirimagestest)
fullpathmaskstest = os.path.join(path, dirmaskstest)

fullpathchecks = os.path.join(path, dirchecks)

Data creation

Within the next task we sorted the data into three directories: fullpathjson, fullpathjsonvalid and fullpathjsontest. We decided to use 78% of the json files as training data, 20% as validation data, and the remaining 2% as test data.

The code below iterates through the files in fullpathdata and copies the filenames into the list validlist, testlist and trainlist.

jsonlist = [os.path.join(filename) for filename in os.listdir(fullpathdata) if filename.endswith(".json") ]

shuffle(jsonlist)

num = 20*len(jsonlist) // 100
validlist = jsonlist[:num]
testlist = jsonlist[num:num+num//10]
trainlist =  jsonlist[num+num//10:]

Finally the json files are copied into the fullpathjson, fullpathjsonvalid and fullpathjsontest directories, see code below.

for filename in trainlist:
    shutil.copy(os.path.join(fullpathdata, filename), os.path.join(fullpathjson, filename))
    
for filename in validlist:
    shutil.copy(os.path.join(fullpathdata, filename), os.path.join(fullpathjsonvalid, filename))
    
for filename in testlist:
    shutil.copy(os.path.join(fullpathdata, filename), os.path.join(fullpathjsontest, filename))

During training, we often figure out, that one of the images is corrupted. It is sometimes hard to see this directly in the error messages of the training processes. For this reason we thought it makes sense to check the data in advance. The function testMasks below is opening the json files, loading in the data, decoding the images and reading in the polygons (“shape”). Each image must have one label, so we confirm this with the assert command. If the assert command fails, the function stops executing, and we remove the json file. It is like sorting out foul fruit.

def testMasks(sourcejsonsdir):
    count = 0
    directory = sourcejsonsdir
    for filename in os.listdir(directory):
        if filename.endswith(".json"):
            print("{}:{}".format(count,os.path.join(directory, filename)))
            f = open(os.path.join(directory, filename))
            data = json.load(f)

            img_arr = data['imageData']  
            imgdata = base64.b64decode(img_arr)

            img = cv2.imdecode(np.frombuffer(imgdata, dtype=np.uint8), flags=cv2.IMREAD_COLOR)

            assert (len(data['shapes']) == 1)

            for shape in data['shapes']:
                print(shape['label'])
                
            count += 1
            
            f.close()

Below we execute the function testMasks against the directories fullpathjson, fullpathjsonvalid and fullpathjsontest.

testMasks(fullpathjson)
testMasks(fullpathjsonvalid)
testMasks(fullpathjsontest)

The images taken by the students do not have square shape. For training however we need square shape images with the model we use. The function getRect below extracts a square from the original image and returns four values: the left/upper corner coordinates (ld, lw) and two lengths of the square’s edge (which is the same value). The left/upper corner coordinate is randomly chosen within constraints, and the square’s edge is set to 90% of the actually width or height of the original images. It depends, which is smaller: height or width.

def getRect(img):
    
    width = img.shape[0]
    height = img.shape[1]
    lw = 0
    ld = 0
    side = 0
    
    if height > width:
    
        widthscale = int(0.9*width)
        left = width - widthscale
        lw = randint(0, left)
        down = height - widthscale
        ld = randint(0, down)
        side = widthscale
    
    else:
    
        heightscale = int(0.9*height)
        down = height - heightscale
        ld = randint(0, down)
        left = width - heightscale
        lw = randint(0, left)
        side = heightscale
    
    
    return (ld,lw,int(side), int(side))

The function getFrameRGB is a utility function to extract a square image from the original image using the return information from the function getRect. The function getFrameRGB therefore returns a square image from an original image.

def getFrameRGB(img, rect):

    retimg = np.zeros((rect[2], rect[3], 3), 'uint8')
    
    assert(rect[2] == rect[3]) 

    retimg[:,:,:] = img[rect[1]:rect[1]+rect[3],rect[0]:rect[0] + rect[2],:]
    
    assert (rect[2] == retimg.shape[0])
    assert (rect[2] == retimg.shape[1])
    
    return retimg

The function getFrameGrey is a utility function to extract a greyscale square image from a original image using the return information from the function getRect. It corresponds to the above function, but it is only used for greyscale images. We will later use this function for the masks.

def getFrameGrey(img, rect):

    retimg = np.zeros((rect[2], rect[3]), 'uint8')
    
    assert(rect[2] == rect[3]) 

    retimg[:,:] = img[rect[1]:rect[1]+rect[3],rect[0]:rect[0] + rect[2]]
    
    assert (rect[2] == retimg.shape[0])
    assert (rect[2] == retimg.shape[1])
    
    return retimg

The above described functions getFrameRGB and getFrameGrey are both used in the function createMasks below. What createMasks basically does is to iterate through a directory containing json files. It opens each json file, decodes the image and stores it in a variable img. Square information is retrieved from img with the functions getRect.

Each json file contains a polygon with its vertices. The vertices are parsed out and a new image mask is generated from them. The image mask is a greyscale image, with pixels containing the 255 value, if they are inside the polygon, and 0 is they are outside the polygon.

Finally we have two images: img_resized and mask_resized which where copied from img and mask.

def createMasks(pre, sourcejsonsdir, destimagesdir, destmasksdir):

    assocf = open(os.path.join(path,"assoc_orig.txt"), "w")
    
    count = 0
    directory = sourcejsonsdir
    for filename in os.listdir(directory):
        if filename.endswith(".json"):
            print("{}:{}".format(count,os.path.join(directory, filename)))
            
            f = open(os.path.join(directory, filename))
            data = json.load(f)
            img_arr = data['imageData']  
            imgdata = base64.b64decode(img_arr)

            img = cv2.imdecode(np.frombuffer(imgdata, dtype=np.uint8), flags=cv2.IMREAD_COLOR)

            img_resized = img.copy()
            rect = getRect(img_resized)

            finalmask = np.zeros((img_resized.shape[0], img_resized.shape[1]), 'uint8')
            mthresh = np.zeros((img_resized.shape[0], img_resized.shape[1]), 'uint8')
            masks=[]

            for shape in data['shapes']:

                vertices = np.array([[point[1],point[0]] for point in shape['points']])
                vertices = vertices.astype(int)

                rr, cc = polygon(vertices[:,0], vertices[:,1], img.shape)
                mask_orig = np.zeros((img.shape[0], img.shape[1]), 'uint8')
                mask_orig[rr,cc] = 255
                masks.append(mask_orig)

            for m in masks:
                _,mthresh = cv2.threshold(m,1,255,cv2.THRESH_BINARY_INV)
                finalmask = cv2.bitwise_and(finalmask,finalmask,mask = mthresh)
                finalmask += m


            img_resized = img.copy()
            mask_resized = finalmask.copy()
            
            img_store = cv2.resize(getFrameRGB(img_resized, rect), dim, interpolation = cv2.INTER_AREA) 
            mask_store = cv2.resize(getFrameGrey(mask_resized, rect), dim, interpolation = cv2.INTER_AREA)
            alpha = 0.8 + 0.4*random()
            beta = int(random()*15)
            img_store = cv2.convertScaleAbs(img_store, alpha=alpha, beta=beta)
                
            cv2.imwrite(os.path.join(destimagesdir, "{}_0_{:05d}.png".format(pre,count)), img_store)
            cv2.imwrite(os.path.join(destmasksdir, "{}_0_{:05d}.png".format(pre,count)), mask_store)

            img_store = cv2.resize(getFrameRGB(img_resized, rect), dim, interpolation = cv2.INTER_AREA) 
            mask_store = cv2.resize(getFrameGrey(mask_resized, rect), dim, interpolation = cv2.INTER_AREA)
            alpha = 0.8 + 0.4*random()
            beta = int(random()*15)
            img_store = cv2.convertScaleAbs(img_store, alpha=alpha, beta=beta)
            img_store = cv2.rotate(img_store, cv2.ROTATE_90_CLOCKWISE)
            mask_store = cv2.rotate(mask_store, cv2.ROTATE_90_CLOCKWISE)
            
            cv2.imwrite(os.path.join(destimagesdir, "{}_90_{:05d}.png".format(pre,count)), img_store)
            cv2.imwrite(os.path.join(destmasksdir, "{}_90_{:05d}.png".format(pre,count)), mask_store)

            img_store = cv2.resize(getFrameRGB(img_resized, rect), dim, interpolation = cv2.INTER_AREA) 
            mask_store = cv2.resize(getFrameGrey(mask_resized, rect), dim, interpolation = cv2.INTER_AREA)
            alpha = 0.8 + 0.4*random()
            beta = int(random()*15)
            img_store = cv2.convertScaleAbs(img_store, alpha=alpha, beta=beta)
            img_store = cv2.rotate(img_store, cv2.ROTATE_180)
            mask_store = cv2.rotate(mask_store, cv2.ROTATE_180)
            
            cv2.imwrite(os.path.join(destimagesdir, "{}_180_{:05d}.png".format(pre,count)), img_store)
            cv2.imwrite(os.path.join(destmasksdir, "{}_180_{:05d}.png".format(pre,count)), mask_store)

            img_store = cv2.resize(getFrameRGB(img_resized, rect), dim, interpolation = cv2.INTER_AREA) 
            mask_store = cv2.resize(getFrameGrey(mask_resized, rect), dim, interpolation = cv2.INTER_AREA)
            alpha = 0.8 + 0.4*random()
            beta = int(random()*15)
            img_store = cv2.convertScaleAbs(img_store, alpha=alpha, beta=beta)
            img_store = cv2.rotate(img_store, cv2.ROTATE_90_COUNTERCLOCKWISE)
            mask_store = cv2.rotate(mask_store, cv2.ROTATE_90_COUNTERCLOCKWISE)
            
            cv2.imwrite(os.path.join(destimagesdir, "{}_270_{:05d}.png".format(pre,count)), img_store)
            cv2.imwrite(os.path.join(destmasksdir, "{}_270_{:05d}.png".format(pre,count)), mask_store)

            count += 1

        else:
            continue
    f.close()

Now we do data augmentation within createMasks. The function createMasks resizes img_resized and mask_resized and moves the resized images into img_store and mask_store. Both are now square images since createMasks utilizes getRect, getFrameRGB and getFrameGrey. The brightness and the contrast of img_store is randomly changed with the opencv convertScaleAbs method. Then a rotation is applied to the image.

This is done four times. So createMasks creates for each input image, four rotated output images with randomized brightness and contrast and saves them into a directory.

createMasks("train", fullpathjson, fullpathimages, fullpathmasks)
createMasks("valid", fullpathjsonvalid, fullpathimagesvalid, fullpathmasksvalid)
createMasks("test", fullpathjsontest, fullpathimagestest, fullpathmaskstest)

In the above code you see how createMasks is applied to the images inside the train, validation and test directories. The output images are stored in fullpathimage, fullpathimagevalid and fullpathimagetest directories and the output masks into fullpathmasks, fullpathmasksvalid and fullpathmaskstest directories. Since we have originally 3500 images, the code above produces due to data augmentation 3500 times 4 images, which is 14000.

Training

We generated the model for training with get_unet (see below), which was written by Tobias Sterbak. The original code can be found here. Few modification have been done, which is basically one of the last lines:

c10 = Conv2D(2, (1, 1), activation=”softmax”) (c9)

The output of the model therefore will be a two layer image, which represents a mask indicating the barcode of an input image. Pixels of the first layers indicate a barcode if set to 1, and pixels of the second layer indicate no barcode if set to 1. The code will not be explained any further, so we refer to the original code.

def conv2d_block(input_tensor, n_filters, kernel_size=3, batchnorm=True):
    # first layer
    x = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), kernel_initializer="he_normal",
               padding="same")(input_tensor)
    if batchnorm:
        x = BatchNormalization()(x)
    x = Activation("relu")(x)
    # second layer
    x = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), kernel_initializer="he_normal",
               padding="same")(x)
    if batchnorm:
        x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

def get_unet(input_img, n_filters=16, dropout=0.5, batchnorm=True):
    # contracting path
    c1 = conv2d_block(input_img, n_filters=n_filters*1, kernel_size=3, batchnorm=batchnorm)
    p1 = MaxPooling2D((2, 2)) (c1)
    p1 = Dropout(dropout*0.5)(p1)

    c2 = conv2d_block(p1, n_filters=n_filters*2, kernel_size=3, batchnorm=batchnorm)
    p2 = MaxPooling2D((2, 2)) (c2)
    p2 = Dropout(dropout)(p2)

    c3 = conv2d_block(p2, n_filters=n_filters*4, kernel_size=3, batchnorm=batchnorm)
    p3 = MaxPooling2D((2, 2)) (c3)
    p3 = Dropout(dropout)(p3)

    c4 = conv2d_block(p3, n_filters=n_filters*8, kernel_size=3, batchnorm=batchnorm)
    p4 = MaxPooling2D(pool_size=(2, 2)) (c4)
    p4 = Dropout(dropout)(p4)
    
    c5 = conv2d_block(p4, n_filters=n_filters*16, kernel_size=3, batchnorm=batchnorm)
    
    # expansive path
    u6 = Conv2DTranspose(n_filters*8, (3, 3), strides=(2, 2), padding='same') (c5)
    u6 = concatenate([u6, c4])
    u6 = Dropout(dropout)(u6)
    c6 = conv2d_block(u6, n_filters=n_filters*8, kernel_size=3, batchnorm=batchnorm)

    u7 = Conv2DTranspose(n_filters*4, (3, 3), strides=(2, 2), padding='same') (c6)
    u7 = concatenate([u7, c3])
    u7 = Dropout(dropout)(u7)
    c7 = conv2d_block(u7, n_filters=n_filters*4, kernel_size=3, batchnorm=batchnorm)

    u8 = Conv2DTranspose(n_filters*2, (3, 3), strides=(2, 2), padding='same') (c7)
    u8 = concatenate([u8, c2])
    u8 = Dropout(dropout)(u8)
    c8 = conv2d_block(u8, n_filters=n_filters*2, kernel_size=3, batchnorm=batchnorm)

    u9 = Conv2DTranspose(n_filters*1, (3, 3), strides=(2, 2), padding='same') (c8)
    u9 = concatenate([u9, c1], axis=3)
    u9 = Dropout(dropout)(u9)
    c9 = conv2d_block(u9, n_filters=n_filters*1, kernel_size=3, batchnorm=batchnorm)
    
    c10 = Conv2D(2, (1, 1), activation="softmax") (c9)
    
    model = Model(inputs=[input_img], outputs=[c10])
    return model

Below you see the code of the data generator used during training. The computer will not be able to load 14000 images into memory, so we have to use batches. The function generatebatchdata delivers batches of training data for the training process. Note that generatebatchdata additionally randomizes the brightness and contrast of 70% of the training images to augment the data further. The code was described in previous posts, so we will omit any more description.

def generatebatchdata(batchsize, fullpathimages, fullpathmasks):
  
    imagenames = os.listdir(fullpathimages)
    imagenames.sort()

    masknames = os.listdir(fullpathmasks)
    masknames.sort()

    for i in range(len(imagenames)):
        assert(imagenames[i] == masknames[i])

    while True:
        batchstart = 0
        batchend = batchsize    
        
        while batchstart < len(imagenames):
            
            imagelist = []
            masklist = []
            
            limit = min(batchend, len(imagenames))

            for i in range(batchstart, limit):
                if imagenames[i].endswith(".png"):
                    img = cv2.imread(os.path.join(fullpathimages,imagenames[i]),cv2.IMREAD_COLOR )
                    if random.random() > 0.3:
                        alpha = 0.8 + 0.4*random.random();
                        beta = int(random.random()*15)
                        img = cv2.convertScaleAbs(img, alpha=alpha, beta=beta)
                    imagelist.append(img)
                if masknames[i].endswith(".png"):
                    img0 = np.zeros(dim, 'uint8')
                    img1 = np.zeros(dim, 'uint8')
                    img0 = cv2.imread(os.path.join(fullpathmasks,masknames[i]),cv2.IMREAD_UNCHANGED)
                    img0 = np.where(img0 > 0, 1, 0)   
                    img1 = np.where(img0 > 0, 0, 1)  
                    img = np.zeros((dim[0], dim[1], 2),'uint8')
                    msum = np.sum(np.array(img0) + np.array(img1))
                    assert(msum == dim[0]*dim[1])
                    img[:,:,0] = img0[:,:]
                    img[:,:,1] = img1[:,:]
                    masklist.append(img)

            train_data = np.array(imagelist, dtype=np.float32)
            train_mask= np.array(masklist, dtype=np.float32)

            train_data -= train_data.mean()
            train_data /= train_data.std()
            
            yield (train_data,train_mask)    

            batchstart += batchsize   
            batchend += batchsize

We decided to use a batch size of ten for model training. The input layer is instantiated and assigned to input_img. The function get_unet creates the model and the model is compiled.

We use predefined callback functions for training, such as early stopping. The callback function were described in previous posts as well.

The variables stepstrainimages and stepsvalidimages are needed for the fit method to set its parameters steps_to_epoch and validation_steps, see code below.

batchsize = 10

input_img = Input((dim[0], dim[1], 3), name='img')
model = get_unet(input_img, n_filters=1, dropout=0.0, batchnorm=True)
model.compile(optimizer=Adam(), loss="categorical_crossentropy", metrics=["accuracy"])

#model.load_weights(os.path.join(path, dirmodels,modelname))

callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ModelCheckpoint(os.path.join(fullpathmodels,modelweightname), verbose=1, save_best_only=True, save_weights_only=True)
]

stepstrainimages = len(os.listdir(fullpathimages))//batchsize
stepsvalidimages = len(os.listdir(fullpathimagesvalid))//batchsize

Below you find the code to instantiate the data generators for training and validation and the fit function to train the model. Number of epochs were set to 20, but the code below can be executed several times in a row (especially if you use a jupyter editor). After training, the model’s weights (modelname) and the model structure (modelnamejson) are saved.

generator_train = generatebatchdata(batchsize, fullpathimages, fullpathmasks)
generator_valid = generatebatchdata(batchsize, fullpathimagesvalid, fullpathmasksvalid)
model.fit(generator_train,steps_per_epoch=stepstrainimages, epochs=20, callbacks=callbacks, validation_data=generator_valid, validation_steps=stepsvalidimages)

model.save_weights(os.path.join(path, dirmodels,modelname))
json_model = model.to_json()
with open(os.path.join(path, dirmodels,modelnamejson), "w") as json_file:
    json_file.write(json_model)

For testing the model, we use the code below. It loads all test images into memory. Since there are only a limited number, it will not face a memory problem to store them into an array. The images are further processed by standardizing with the mean and the standard deviation.

imagetestlist = []

imagetestnames = os.listdir(fullpathimagestest)
imagetestnames.sort()

for imagename in imagetestnames:
    if imagename.endswith(".png"):
        imagetestlist.append(cv2.imread(os.path.join(fullpathimagestest,imagename),cv2.IMREAD_COLOR ))
        
test_data = np.array(imagetestlist, dtype=np.float32)
test_data -= test_data.mean()
test_data /= test_data.std()

predictions = model.predict(test_data, batch_size=1, verbose=1)

The predict method above predicts the test images and returns the result into the predictions variable.

Below you find the code to lift the pixel values to 255 in case the predicted images pixel above 0.5.

ind = 56
plt.imshow(imagetestlist[ind])

img = predictions[ind][:,:,0]
img = np.where(img > 0.5, 255, 0)
plt.imshow(img)

Figure 2 shows two images. The first image on the left side is the original image and the second image on the right side is the predicted mask image. You can see that the mask image is clearly showing the location of the pixels indicating a barcode.

Figure 2: Predicted mask

Life Example

The idea of this project was to use a life video stream and apply the predict function on the images of the video stream to generate masks. The masks can be overlaid with the original images and shown on the display.

Another idea is to apply a barcode scanning software to the original image to read the barcode’s content. The content is then put as text onto the display.

Below we first open the model structure and save it into json_file. The function model_from_json moves the structure into loaded_model. The code then sets the weights, stored in modelname into loaded_model.

json_file = open(os.path.join(fullpathmodels, modelnamejson), 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json)

loaded_model.load_weights(os.path.join(path, dirmodels,modelname))

The images of the video stream do not have square size, so we need to extract square images from them. For this we have written the function getRectStatic. It is basically does the as same function getRect, but the randomization of the image position coordinates was taken out.

def getRectStatic(img):
    
    width = img.shape[0]
    height = img.shape[1]
    lw = 0
    ld = 0
    side = 0
    
    if height > width:
    
        widthscale = int(0.9*width)
        left = width - widthscale
        lw = left//2
        down = height - widthscale
        ld = down//2
        side = widthscale
    
    else:
    
        heightscale = int(0.9*height)
        down = height - heightscale
        ld = down//2
        left = width - heightscale
        lw = left//2
        side = heightscale
    
    
    return (ld,lw,int(side), int(side))

Below the code which displays a overlaid image from a webcam with the predicted mask image. The opencv method VideoCapture instantiates a video stream object. Inside the endless while loop, an image is read from the video stream. The image is then applied to getRectStatic to extract coordinates of a square image. The square image img is retrieved then from frame. The image img is resized and moved into a one element imgpredict list. This element inside imgpredict is then standardized with the mean and standard deviation method.

The keras method predict processes imgpredict and moves the predicted output (one mask image) into a one element predictions list. The numpy where method sets each pixel value of the mask image to 255 or to 0, depending if the value is above 0.3 or below.

The code below resizes the mask image and gets the largest contour of mask image with opencv method findContours. The contour is used to compute a rectangle around the contour (minAreaRect) and to draw it on the display (drawContour).

The function decode is a pyzbar library function. It calculates from a barcode image the content of the barcode and returns it into the variable result. The code is iterating through the structure of the variable result and moves the recognized barcode content into barcodetext. The string in barcodetext is put onto the display as well.

vid = cv2.VideoCapture(0) 

saveimg = np.zeros((dim[0], dim[1], 3), "uint8")
 
count = 0    

bardcodetxt = ""
    
while(True): 

    ret, frame = vid.read() 
    
    d,w,side,_ = getRectStatic(frame)
    
    img = np.zeros((side, side, 3), "uint8")
    
    img[::] = frame[w:w+side, d:d+side,:]
    
    imgpredict = []
    imgpredict.append(cv2.resize(img, dim, interpolation = cv2.INTER_AREA))
    imgpredict = np.array(imgpredict, dtype=np.float32)
    imgpredict -= imgpredict.mean()
    imgpredict /= imgpredict.std()
    
    predictions = loaded_model.predict(imgpredict, batch_size=1, verbose=0)
    
    prediction = predictions[0][:,:,0]
    prediction = np.where(prediction > 0.3, 255, 0)
    prediction = np.array(prediction, "uint8")

    
    predresized = np.zeros((side, side, 3), np.uint8)
    predresized[:,:,2] = cv2.resize(prediction, (side, side), interpolation = cv2.INTER_AREA)[:,:]
    contours, hierarchy = cv2.findContours(prediction,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
    cnts = sorted(contours, key=cv2.contourArea)

    result = decode(img)
    
    if len(cnts) > 0:
        rect = cv2.minAreaRect(cnts[-1])
        box = cv2.boxPoints(rect)
        box *= side/dim[0]
        box = np.int0(box)
        img = cv2.drawContours(img,[box],0,(0,255,0),2)
        
    for index,i in enumerate(result):
        bardcodetxt=i.data.decode("utf-8")
        saveimg = newimg.copy()
        


    newimg = cv2.addWeighted(img, 0.5, predresized, 0.5, 0)
    
    newimg = cv2.putText(newimg, bardcodetxt, (20,30), cv2.FONT_HERSHEY_SIMPLEX,  1, (255,255,255), 2, cv2.LINE_AA)

    cv2.imshow('frame',newimg) 

      
    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'): 
        break
    if key == ord('s'):
        cv2.imwrite(os.path.join(fullpathimagestest, f"{count}.png"), cv2.resize(img, dim, interpolation = cv2.INTER_AREA))
        count += 1
vid.release()
cv2.destroyAllWindows() 

Results

In Figure 3 you find a screen shot of the displayed video stream. Here somebody holds a bottle with barcode into the scene of the webcam. The predicted mask of the barcode is overlaid to the original video stream. You find the rectangle around the mask (green). On the above and left corner you can find the content of the barcode computed by the pyzbar library.

Figure 3: Life barcode detection and barcode content

The life video stream works actually pretty good. It does show the position of barcodes, whenever you put one in front of the webcam. The barcode reading software (pyzbar) however needs several retries to compute the content of the barcode correctly. In general we can say that neural networks can be used very well for determining the barcode position.

Acknowledgement

Thank you very much to the master students of the class embedded systems of winter semester 2020. Seven students labeled in their semester assignment 3500 images which is very time consuming work.

Also special thanks to the University of Applied Science Albstadt-Sigmaringen for providing the infrastructure and the appliances to enable this class.

Life 3D image generation with a Jetson Nano

Introduction

Early this year we received the 3D Camera O3D303 from IFM. The camera captures 3D images with the size of 176×132 from a distance up to 8m. The user can retrieve the 3D images from the camera via TCP/IP. IFM published the protocols to access the images from the camera on their download site. Picture 1 is a 3D image showing a person stretching out his hand. The brightness of the image pixels indicates the distance of each pixel from the camera lens.

Picture 1: 3D Image

We have talked this year to a company, which is selling driverless transport systems. We thought we can help them to apply our camera to a specific problem they have with their driverless transport systems. Unfortunately their interest was limited, but they were interested in another use case related to this topic. This company is trying to cut costs of their products by looking for a substitute for their 3D camera. The idea was to train a neural network on a constrained environment with images from regular cameras and from a 3D camera. During the normal operation the driverless transport system should then capture images with regular cameras and predict 3D images with a trained neural network. We have not made any business case with this company, however we kept the idea and continued to do research on it.

Experimental setup

The first goal was to build an experimental setup, which enabled us to capture regular color images and 3D images needed to train a neural network. Later we use the same setup for our testing to predict 3D images. We have built a frame with bar profiles and a top-hat rail to attach power supplies (5V and 24V) and a Jetson Nano computer, see Picture 2 top and below. It is the first time we used a Jetson Nano computer and were looking forward learning about its performance.

Picture 2: Experimental Setup

On the upper picture you see on the top bar profile two camera serial interface (CSI) cameras attached with 8cm distance to each other. This is roughly the humans eyes distance which we want to duplicate. The Jetson Nano (on the top-hat rail, picture below) has two CSI interfaces, so we connected the cameras to it.

The IFM camera was attached on the frame on the top, see top picture upper right corner. It requires 24V from the power supply. To capture images with the Jetson Nano, we use the 3D camera’s ethernet interface.

Below some globally defined variables, which show up frequently in this article. The dimension of all images we use are 128×128. We also defined a path for the image data (path). Image data for training, validation and testing are stored in the “left”, “right” and “3d” directories (dataleft, dataright, data3d).

dim = (128, 128) 
limit = 128

path =  r'...\3D'

dataleft = os.path.join(path, r"data\left")
dataright = os.path.join(path, r"data\right")
data3d = os.path.join(path, r"data\3d")

The producer process to trigger image capturing

The Jetson Nano has two CSI interfaces for both CSI cameras. Unfortunately the Jetson Nano’s hardware is not synchronizing both cameras. You can see this problem, when a program displays the captured images in real time with OpenCV. The program will show a delay while displaying both images. It becomes even worse, when the program displays 3D images in real time from the 3D camera.

Another goal is to create a large number of training, validation and testing images from the left, right and 3D camera. However the delay between the captured images complicates the image collection, because the scenes to be captured need to be frozen.

We figured out, that if we run programs to capture images in three separate processes, the ubuntu operating system, which is installed on the Jetson Nano, is doing synchronization. Delays between the left and right camera cannot be seen in such an extent anymore. Only the 3D camera shows a little delay. We assume this is due to the reaction time of the 3D camera itself. So before wiring extra hardware, we decided to to let the operating system do the synchronization job.

So the decision was made to have three separate processes to capture images from all the three cameras. In order to do this, we implemented a cross process communication, so a producer process can trigger commands to three consumer processes, which capture images from the left, right and 3D cameras. For simplicity we use memory mapping and a file named command.txt with size 140 bytes. The code below shows how this file is created, so all three image capturing processes can consume its content in form of a json string.

COMMANDSIZE = 140
COMMANDNAME = "command.txt"
text = '{"count" :0, "command": "none", "attr": ""}'
text = text.ljust(COMMANDSIZE)

textfile = open(COMMANDNAME, "w")
textfile.write(text)
textfile.close()

The producer program calls the function setdata, which takes a number (counter), a command string (command) and an attribute string (attr) as parameters and writes the content into a json string, see below. The json string is moved into a memory map, which is linked to the command.txt file.

def senddata(counter, command, attr):

    with open(COMMANDNAME, "r+b") as f:
        data = {"count": counter, "command": command, "attr": attr}
        datastr = json.dumps(data)
     
        datastr = datastr.ljust(COMMANDSIZE)
    
        mm = mmap.mmap(f.fileno(), 0)

        print(datastr.encode('utf-8'))
        mm.write(datastr.encode('utf-8'))

Below an example how the producer program triggers the image capturing on all three consumer processes. It sends additionally a string with time and data information. The consumer processes use this string to create names for their captured images.

now = datetime.now()
date_time = now.strftime("%m_%d_%Y__%H_%M_%S")
senddata(count, "shoot", date_time)
count += 1

The function senddata above takes count, “shoot” and date_time and writes them into the command.txt file. The consumer processes (the running left, right and 3D camera image capturing programs) are constantly reading this file, while the variable count is used to sense if a new command is issued.

Consumer’s processes to capture images

We implemented three separate image capturing programs (consumer processes) and run them as different processes. The producer process sends out commands via memory map, which is linked to the command.txt file. All three consumer processes are repeatedly reading the command.txt file to find out if a new command was issued. This is done by checking for an increased count value.

Below you see the function getdata, which reads the json string out of command.txt, parses the json content and returns the values from it.

COMMANDSIZE = 140
COMMANDNAME = "command.txt"

def getdata():
    with open(COMMANDNAME, "r+b") as f:
        mm = mmap.mmap(f.fileno(), 0)
        strdata = mm.readline().decode('ascii')

        data = json.loads(strdata)
        
        return data["count"], data["command"], data["attr"]

Image capturing with the 3D camera

The documentation of IFM’s O3D303 camera can be found here. Prior to accessing the camera via TCP/IP, we have configured the camera to use dhcp, so the 3D camera can retrieve an IP address. The Jetson Nano gets the 3D camera’s IP address with the help of the linux nmap command, which scans all IP addresses in its network environment. The nmap command returns all MAC addresses related to the IP addresses. Since we know the MAC address of the camera, we can parse out the IP address from nmap‘s output. See code below.

os.system('nmap -sP 143.90.26.0/24')
lines=[]
out = os.popen('arp -a').read()
lines = out.splitlines()

matching = [s for s in lines if "00:02:21:41:ef:53" in s]
matchstr = matching[0]
ip = matchstr[matchstr.find("(")+1:matchstr.find(")")]

Communication between IFM’s 3D camera and the Jetson Nano is done via TCP/IP sockets. We have written the class Frame to retrieve 3D images from the 3D camera, see code below. The 3D camera opens per default port 50010 and sends out frequently 3D image data. Our 3D image capturing program, which is using the class Frame, connects to the 3D camera with the socket’s connect method. As a parameter the program needs to pass the 3D camera’s IP address and the port number (which is 50010). The method trigger of class Frame catches the image data, which is sent by the 3D camera. It parses the data stream, until is finds the string “\r\n0000start”. The remaining 255830 bytes of data contains data of a single scene in five different image representations which are subsequently stored in Frame‘s attributes img0, img1, img2, img3, img4. See also chunktypes for names of the image representations. Note that the number 255830 depends on the 3D camera configuration, which can be set with an additional O3D303 software. We however use the factory settings, so we did not change the configuration. The receiveAll method receives the 255830 bytes and stores the data into the variable frame.

Each image representation consists of a header with header size, image type, width, height, and pixel format and a body with the image raw data. Inside the method trigger, the program reads the images from the variable frame using the header’s width, height and pixel format information and stores them into the Frame‘s img0, img1, img2, img3, img4 attributes.

class Frame:
    
    def __init__(self):
        self.chunktypes = {
            100: "RADIAL_DISTANCE_IMAGE",
            101: "NORM_AMPLITUDE_IMAGE",
            103: "AMPLITUDE_IMAGE",
            104: "GRAYSCALE_IMAGE",
            200: "CARTESIAN_X_COMPONENT",
            201: "CARTESIAN_Y_COMPONENT",
            202: "CARTESIAN_Z_COMPONENT",
            203: "CARTESIAN_ALL",
            223: "UNIT_VECTOR_ALL",
            300: "CONFIDENCE_IMAGE",
            302: "DIAGNOSTIC",
            305: "JSON_DIAGNOSTIC",
        }

        self.pixelformat = {
            0: "FORMAT_8U",
            1: "FORMAT_8S",
            2: "FORMAT_16U",
            3: "FORMAT_16S",
            4: "FORMAT_32U",
            5: "FORMAT_32S",
            6: "FORMAT_32F",
            7: "FORMAT_64U",
            8: "FORMAT_64F",
            9: "Reserved",
            10: "FORMAT_32F_3",
        }

        self.pixelsize = {
            0: 1,
            1: 1,
            2: 2,
            3: 2,
            4: 4,
            5: 4,
            6: 4,
            7: 8,
            8: 8,
            9: 0,
            10: 0,
        }
        
    def getChunkType(self, data):
        return int.from_bytes(data[0:4],"little")

    def getChunkSize(self, data):
        return int.from_bytes(data[4:8],"little")

    def getHeaderSize(self, data):
        return int.from_bytes(data[8:12],"little")

    def getHeaderVersion(self, data):
        return int.from_bytes(data[12:16],"little")

    def getImageWidth(self, data):
        return int.from_bytes(data[16:20],"little")

    def getImageHeight(self, data):
        return int.from_bytes(data[20:24],"little")

    def getPixelFormat(self, data):
        return int.from_bytes(data[24:28],"little")

    def getTimeStamp(self, data):
        return int.from_bytes(data[28:32],"little")

    def getFrameCount(self, data):
        return int.from_bytes(data[32:36],"little")
 
    def connect(self, ip, port):
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.ip = ip
        self.port = port
        self.s.connect((self.ip, self.port))
         
    def close(self):
        self.s.close()
      
    def receiveAll(self, num):
        chunks = []
        bytes_recd = 0
        while bytes_recd < num:
            chunk = self.s.recv(min(num - bytes_recd, 2048))
            if chunk == b'':
                raise RuntimeError("socket connection broken")
            chunks.append(chunk)
            bytes_recd = bytes_recd + len(chunk)
        return b''.join(chunks)    

    def trigger(self):
        
        replies=[]
        reply=0
        frame = b''
        while reply!=b'\r':
            reply = self.s.recv(1)
        reply = self.s.recv(1)
        assert(reply==b'\n')
        reply = self.s.recv(4)
        assert(reply==b'0000')
        reply = self.s.recv(4)
        assert(reply==b'star')
        
        frame = self.receiveAll(255830)
        assert(frame[255824:255830]==b'stop\r\n')

        posimage0 = 0
        chunksizeimage0 = self.getChunkSize(frame[0:12*4])
        posimage1 = chunksizeimage0
        widthimage0 = self.getImageWidth(frame[posimage0:posimage0+12*4]) 
        heightimage0 = self.getImageHeight(frame[posimage0:posimage0+12*4])
        pixelsizeimage0 = self.pixelsize[self.getPixelFormat(frame[posimage0:posimage0+12*4])]
        overheadimage0 = chunksizeimage0 - widthimage0*heightimage0*pixelsizeimage0
        
        imgtemp = bytearray(frame[overheadimage0:overheadimage0+widthimage0*heightimage0*pixelsizeimage0])
        self.img0 = [imgtemp[2*k] + 256*imgtemp[2*k+1] for k in range(int(widthimage0*heightimage0*pixelsizeimage0/2))]
        self.img0 = np.array(self.img0)
        self.img0 = self.img0.reshape(widthimage0, heightimage0)
        
        chunksizeimage1 = self.getChunkSize(frame[posimage1:posimage1+12*4])
        posimage2 = chunksizeimage0 + chunksizeimage1
        widthimage1 = self.getImageWidth(frame[posimage1:posimage1+12*4]) 
        heightimage1 = self.getImageHeight(frame[posimage1:posimage1+12*4])
        pixelsizeimage1 = self.pixelsize[self.getPixelFormat(frame[posimage1:posimage1+12*4])]
        overheadimage1 = chunksizeimage1 - widthimage1*heightimage1*pixelsizeimage1
        
        imgtemp = bytearray(frame[posimage1+overheadimage1:posimage1+overheadimage1+widthimage1*heightimage1*pixelsizeimage1])
        self.img1 = [imgtemp[2*k] + 256*imgtemp[2*k+1] for k in range(int(widthimage1*heightimage1*pixelsizeimage1/2))]
        self.img1 = np.array(self.img1, dtype=np.uint16)
        self.img1 = self.img1.reshape(heightimage1, widthimage1)
        
        chunksizeimage2 = self.getChunkSize(frame[posimage2:posimage2+12*4])
        posimage3 = chunksizeimage0 + chunksizeimage1 + chunksizeimage2
        widthimage2 = self.getImageWidth(frame[posimage2:posimage2+12*4]) 
        heightimage2 = self.getImageHeight(frame[posimage2:posimage2+12*4])
        pixelsizeimage2 = self.pixelsize[self.getPixelFormat(frame[posimage2:posimage2+12*4])]
        overheadimage2 = chunksizeimage2 - widthimage2*heightimage2*pixelsizeimage2

        imgtemp = bytearray(frame[posimage2+overheadimage2:posimage2+overheadimage2+widthimage2*heightimage2*pixelsizeimage2])
        self.img2 = [imgtemp[2*k] + 256*imgtemp[2*k+1] for k in range(int(widthimage2*heightimage2*pixelsizeimage2/2))]
        self.img2 = np.array(self.img2, dtype=np.int16)
        self.img2 = self.img2.reshape(heightimage2, widthimage2)
        
        chunksizeimage3 = self.getChunkSize(frame[posimage3:posimage3+12*4])
        posimage4 = chunksizeimage0 + chunksizeimage1 + chunksizeimage2 + chunksizeimage3
        widthimage3 = self.getImageWidth(frame[posimage3:posimage3+12*4]) 
        heightimage3 = self.getImageHeight(frame[posimage3:posimage3+12*4])
        pixelsizeimage3 = self.pixelsize[self.getPixelFormat(frame[posimage3:posimage3+12*4])]
        overheadimage3 = chunksizeimage3 - widthimage3*heightimage3*pixelsizeimage3

        imgtemp = bytearray(frame[posimage3+overheadimage3:posimage3+overheadimage3+widthimage3*heightimage3*pixelsizeimage3])
        self.img3 = [imgtemp[2*k] + 256*imgtemp[2*k+1] for k in range(int(widthimage3*heightimage3*pixelsizeimage3/2))]
        self.img3 = np.array(self.img3, dtype=np.int16)
        self.img3 = self.img3.reshape(heightimage3, widthimage3)
        

        chunksizeimage4 = self.getChunkSize(frame[posimage4:posimage4+12*4])
        posimage5 = chunksizeimage0 + chunksizeimage1 + chunksizeimage2 + chunksizeimage3 + chunksizeimage4
        widthimage4 = self.getImageWidth(frame[posimage4:posimage4+12*4]) 
        heightimage4 = self.getImageHeight(frame[posimage4:posimage4+12*4])
        pixelsizeimage4 = self.pixelsize[self.getPixelFormat(frame[posimage4:posimage4+12*4])]
        #self.pixelform = self.getPixelFormat(frame[posimage4:posimage4+12*4])
        overheadimage4 = chunksizeimage4 - widthimage4*heightimage4*pixelsizeimage4

        imgtemp = bytearray(frame[posimage4+overheadimage4:posimage4+overheadimage4+widthimage4*heightimage4*pixelsizeimage4])
        self.img4 = [imgtemp[2*k] + 256*imgtemp[2*k+1] for k in range(int(widthimage4*heightimage4*pixelsizeimage4/2))]
        self.img4 = np.array(self.img4, dtype=np.int16)
        self.img4 = self.img4.reshape(heightimage4, widthimage4)
        
        chunksizeimage5 = self.getChunkSize(frame[posimage5:posimage5+12*4])
        posdiagnostic = chunksizeimage0 + chunksizeimage1 + chunksizeimage2 + chunksizeimage3 + chunksizeimage4 + chunksizeimage5
           

Below you find the code for the 3D image capturing process. It starts with instantiating a Frame object and connects it to the 3D camera with its method connect. Within a while loop the program receives 3D images by executing method trigger. The 3D image we use is stored into Frame‘s attribute img4. Since the format of each pixel is int16, it may also contain negative numbers, which should not be applied to any OpenCV functions. Therefore the code below is standardizing the pixel values of the received 3D image to a range between 0 and 127. The values of formax and formin in the code below were determined by experiments. These are the max and min values of a img4. Actually the 3D camera can capture in our configuration objects with a the maximum depth of 4m measured from the camera lens. In our scenario we placed the 3D camera with 2m distance from a wall. We chose to use 127 as a maximum depth length number for pixel values, which represent the distance of 2m.

current = -1
frame = Frame()
frame.connect(ip, 50010)
formax = 5000
formin = -100
mean = 0

while(True):

    frame.trigger()

    imgshow = ((frame.img4 - formin)/(formax-formin))*255
    if np.amin(frame.img4) < formin:
        print("min value")
        break
    if np.amax(frame.img4) > formax:
        print("max value")
        break          
    
    imgshow = np.array(imgshow, dtype=np.uint8)

    imgresized = cv2.resize(imgshow, (int(176*3.636), int(132*3.636)), interpolation = cv2.INTER_AREA) 

    cv2.imshow('frame3D',imgresized)
    
    count, command, attr = getdata()
    
    if count != current and command == "quit":
        current = count
        break
    if count != current and command == "shoot":
        print("shoot pic")
        cv2.imwrite(os.path.join(data3d, f"{attr}.png"),imgshow)
        current = count
    
    key = cv2.waitKey(10) & 0xFF
    if key == ord('q'):
        break
        
cv2.destroyAllWindows()
frame.close()

The program above continuously resizes the standardized images and displays them on the monitor with the OpenCV function imshow. It reads inside the while loop the memory map with the cross process communication function getdata . When the producer process issues a new command (with increased count value and command “shoot”), the program saves the standardized 3D image to disk into the “3d” directory (variable data3d).

Image capturing with left and right camera

OpenCV and the Jetson Nano’s operating system offers the VideoCapture class to capture images with the CSI cameras with a function gstreamer to pass over a video stream, see code below. The function gstreamer calls an operating system function (supposedly nvarguscamerasrc) with a given sensor_id to identify the left or right camera. In our case we defined the sensor_id to be zero for the left camera, and one for the right camera. More parameters of gstreamer are the image’s width and height, as well as framerate etc.

def gstreamer (sensor_id=0, capture_width=640, capture_height=480, display_width=640, 
     display_height=480, framerate=21, flip_method=2) : 
    stream = f'nvarguscamerasrc sensor-id={sensor_id} ! ' + \
        f'video/x-raw(memory:NVMM), ' + \
        f'width=(int){capture_width}, height=(int){capture_height}, ' + \
        f'format=(string)NV12, framerate=(fraction){framerate}/1 ! ' + \
        f'nvvidconv flip-method={flip_method} ! ' + \
        f'video/x-raw, width=(int){display_width}, height=(int){display_height}, format=(string)BGRx ! ' + \
        f'videoconvert ! ' + \
        f'video/x-raw, format=(string)BGR ! appsink' 
    
    return stream

Below we show how the program instantiates the object cap0 from the class VideoCapture. In this case the left camera is configured. Within a while loop, an image is frequently captured with the VideoCapture‘s read method. The program scans with the getdata function, if the producer has issued a command. In case the producer process issued a “shoot” command, a captured image (frame0) is save to disk into a “left” directory (variable dataleft).

current = -1

cap0 = cv2.VideoCapture(gstreamer(sensor_id=0), cv2.CAP_GSTREAMER)

while(True):

    ret0, frame0 = cap0.read()
   
    if ret0 == True:
        cv2.imshow('frameLeft', frame0)

    count, command, attr = getdata()
    
    if count != current and command == "quit":
        current = count
        break
    if count != current and command == "shoot":
        print("shoot pic")
        cv2.imwrite(os.path.join(dataleft, f"{attr}.png"),frame0)
        current = count

    key = cv2.waitKey(10) & 0xFF
    if key == ord('q'):
        break

cv2.destroyAllWindows()
cap0.release()

You find here only the code for the left camera. The code for the right camera is essentially the same, however the sensor_id was defined as one, and the path to save the captured images is called “right” (variable dataright).

Processing the training data

Above we have described the code to capture images from the left and right camera and 3D images from the 3D camera. We run three programs separately in three processes to overcome the problem with badly synchronized cameras. Also we have shown the code to trigger (producer) the capturing of all images by issuing a command via memory map. In Picture 3 you see an example of captured images of the left camera, 3D camera, and right camera.

Note that the left image and the right image are shifted. This is due to the fact that the CSI cameras are mounted with 8cm distance from each other. The grayscale image in the middle of Picture 3 is the 3D image. Pixels close to the camera are displayed dark, and pixels further away (in this case wall) are lighter.

Picture 3: Left image, 3D image, right image

We took about 3000 images from the different scenarios, mostly from persons. Unfortunately this took some time, because the 3D camera showed a delay of about half a second. So we have to wait until person is frozen before capturing three images at the same time. All images were saved into three different directories with variables dataright, dataleft and data3d.

The program below loads in the names of images into the string lists data3dlist, dataleftlist and datarightlist. All three string lists must have exactly the same image names, because each image capturing process stores images with the same name into the “left”, “right” and “3d” directories.

trainleft = os.path.join(path, r"train\left")
trainright = os.path.join(path, r"train\right")
train3d = os.path.join(path, r"train\3d")

validleft = os.path.join(path, r"valid\left")
validright = os.path.join(path, r"valid\right")
valid3d = os.path.join(path, r"valid\3d")

testleft = os.path.join(path, r"test\left")
testright = os.path.join(path, r"test\right")
test3d = os.path.join(path, r"test\3d")

dataleftlist = os.listdir(dataleft)
datarightlist = os.listdir(dataright)
data3dlist = os.listdir(data3d)
 
shuffle(data3dlist)

num = 20*len(data3dlist) // 100
validlist = data3dlist[:num]
testlist = data3dlist[num:num+num//10]
trainlist =  data3dlist[num+num//10:]

for name in data3dlist:
    img = cv2.imread(os.path.join(data3d,name), cv2.IMREAD_GRAYSCALE)
    img = np.array(img, dtype=np.uint8)
    assert(img.max() < limit)

The program above shuffles the string name list in data3dlist. Around 20 percent of the image names are stored into the list validlist, two percent of the image names into the list testlist and the remaining names into the list trainlist.

After executing the code below, the images are stored into the training, validation and testing directories. The program is iterating through the lists trainlist, validlist and testlist, reads in the images, and copies them to the training directory (“left”, “right”, “3d”), validation directory (“left”, “right”, “3d”) and testing directory (“left”, “right”, “3d”).

Note after reading in the left and right images, a square section with size 360×360 is taken from the images and resized to 128×128.

for name in trainlist:
    img = cv2.imread(os.path.join(data3d,name), cv2.IMREAD_GRAYSCALE)
    img = np.array(img, dtype=np.uint8)
    img_sqr = np.zeros(dim, 'uint8')
    img_sqr = img[0:dim[0],img.shape[1] - dim[1]:img.shape[1]]
    cv2.imwrite(os.path.join(train3d,name), img_sqr)
    
    img0 = cv2.imread(os.path.join(dataright,name), cv2.IMREAD_COLOR)
    img0 = np.array(img0, dtype=np.uint8)
    img0 = img0[60:420,120:480,:]
    img0 = cv2.resize(img0, dim, interpolation = cv2.INTER_AREA)
    cv2.imwrite(os.path.join(trainright,name), img0)
    
    img1 = cv2.imread(os.path.join(dataleft,name), cv2.IMREAD_COLOR)
    img1 = np.array(img1, dtype=np.uint8)
    img1 = img1[60:420,120:480,:]
    img1 = cv2.resize(img1, dim, interpolation = cv2.INTER_AREA)
    cv2.imwrite(os.path.join(trainleft,name), img1)
    
for name in validlist:
    img = cv2.imread(os.path.join(data3d,name), cv2.IMREAD_GRAYSCALE)
    img = np.array(img, dtype=np.uint8)
    img_sqr = np.zeros(dim, 'uint8')
    img_sqr = img[0:dim[0],img.shape[1] - dim[1]:img.shape[1]]
    cv2.imwrite(os.path.join(valid3d,name), img_sqr)
    
    img0 = cv2.imread(os.path.join(dataright,name), cv2.IMREAD_COLOR)
    img0 = np.array(img0, dtype=np.uint8)
    img0 = img0[60:420,120:480,:]
    img0 = cv2.resize(img0, dim, interpolation = cv2.INTER_AREA)
    cv2.imwrite(os.path.join(validright,name), img0)
    
    img1 = cv2.imread(os.path.join(dataleft,name), cv2.IMREAD_COLOR)
    img1 = np.array(img1, dtype=np.uint8)
    img1 = img1[60:420,120:480,:]
    img1 = cv2.resize(img1, dim, interpolation = cv2.INTER_AREA)
    cv2.imwrite(os.path.join(validleft,name), img1)

for name in testlist:
    img = cv2.imread(os.path.join(data3d,name), cv2.IMREAD_GRAYSCALE)
    img = np.array(img, dtype=np.uint8) 
    img_sqr = np.zeros(dim, 'uint8')
    img_sqr = img[0:dim[0],img.shape[1] - dim[1]:img.shape[1]]
    cv2.imwrite(os.path.join(test3d,name), img_sqr)
    
    img0 = cv2.imread(os.path.join(dataright,name), cv2.IMREAD_COLOR)
    img0 = np.array(img0, dtype=np.uint8)
    img0 = img0[60:420,120:480,:]
    img0 = cv2.resize(img0, dim, interpolation = cv2.INTER_AREA)
    cv2.imwrite(os.path.join(testright,name), img0)
    
    img1 = cv2.imread(os.path.join(dataleft,name), cv2.IMREAD_COLOR)
    img1 = np.array(img1, dtype=np.uint8)
    img1 = img1[60:420,120:480,:]
    img1 = cv2.resize(img1, dim, interpolation = cv2.INTER_AREA)
    cv2.imwrite(os.path.join(testleft,name), img1)

The neural network

We decided to use the UNet neural network to predict 3D images from the left and right images. We have already good experience with UNets from past projects. The UNet was already explained in past posts and can be found here. Originally the code below was written by Tobias Sterbak and can be found here. We will not explain it in detail anymore, besides one thing, which we have modified. The last layer of the UNet is a convolutional layer, which is assigned to the variable c10. The depth of the layer is 16. The UNet model uses a container with 16 stacked images with size 128×128 each. We have described already above, that the depth images created by the 3D camera have a range of pixel values between 0 and 127. If we would use this range, we would need a UNet model, which generates 128×128 images with 128 stacks (each stack indicates a distance from the camera). These are huge data containers and would exceed the graphic card capacity during training. For this reason we limit the image size for predicted images to 128x128x16.

def conv2d_block(input_tensor, n_filters, kernel_size=3, batchnorm=True):
    x = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), kernel_initializer="he_normal",
               padding="same")(input_tensor)
    if batchnorm:
        x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), kernel_initializer="he_normal",
               padding="same")(x)
    if batchnorm:
        x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

def get_unet(input_img, n_filters=16, dropout=0.5, batchnorm=True):
    c1 = conv2d_block(input_img, n_filters=n_filters*1, kernel_size=3, batchnorm=batchnorm)
    p1 = MaxPooling2D((2, 2)) (c1)
    p1 = Dropout(dropout*0.5)(p1)
    
    c2 = conv2d_block(p1, n_filters=n_filters*2, kernel_size=3, batchnorm=batchnorm)
    p2 = MaxPooling2D((2, 2)) (c2)
    p2 = Dropout(dropout)(p2)

    c3 = conv2d_block(p2, n_filters=n_filters*4, kernel_size=3, batchnorm=batchnorm)
    p3 = MaxPooling2D((2, 2)) (c3)
    p3 = Dropout(dropout)(p3)

    c4 = conv2d_block(p3, n_filters=n_filters*8, kernel_size=3, batchnorm=batchnorm)
    p4 = MaxPooling2D(pool_size=(2, 2)) (c4)
    p4 = Dropout(dropout)(p4)
    
    c5 = conv2d_block(p4, n_filters=n_filters*16, kernel_size=3, batchnorm=batchnorm)
    
    u6 = Conv2DTranspose(n_filters*8, (3, 3), strides=(2, 2), padding='same') (c5)
    u6 = concatenate([u6, c4])
    u6 = Dropout(dropout)(u6)
    c6 = conv2d_block(u6, n_filters=n_filters*8, kernel_size=3, batchnorm=batchnorm)

    u7 = Conv2DTranspose(n_filters*4, (3, 3), strides=(2, 2), padding='same') (c6)
    u7 = concatenate([u7, c3])
    u7 = Dropout(dropout)(u7)
    c7 = conv2d_block(u7, n_filters=n_filters*4, kernel_size=3, batchnorm=batchnorm)

    u8 = Conv2DTranspose(n_filters*2, (3, 3), strides=(2, 2), padding='same') (c7)
    u8 = concatenate([u8, c2])
    u8 = Dropout(dropout)(u8)
    c8 = conv2d_block(u8, n_filters=n_filters*2, kernel_size=3, batchnorm=batchnorm)

    u9 = Conv2DTranspose(n_filters*1, (3, 3), strides=(2, 2), padding='same') (c8)
    u9 = concatenate([u9, c1], axis=3)
    u9 = Dropout(dropout)(u9)
    c9 = conv2d_block(u9, n_filters=n_filters*1, kernel_size=3, batchnorm=batchnorm)
    
    c10 = Conv2D(depth, (1, 1), activation="softmax") (c9)
    
    model = Model(inputs=[input_img], outputs=[c10])
    return model

If we divide all 3D camera images by 8, we limit the pixel value range from 0 to 15. For training however, we need to convert the 3D images with 128×128 shape (and a 0 to 15 pixel range) to images with a 128x128x16 shape. The set of a pixel values will then only be zero or one. The image converting function is shown below. A while loop iterates through all pixel values (from 0 to 15) and moves zeros or ones into the corresponding image stack of the return image.

def makecolormask(mask):

    ret_mask = np.zeros((mask.shape[0], mask.shape[1], depth), 'uint8')
    
    for c in range(depth):
        ret_mask[:, :, c] = (mask == c).astype(int)
        
    return ret_mask

Training

We use generators to load in batches of images during training time. Due to the size of the training images (especially the 3D image), we easily exceed the graphic card and computers memory capacity, so the training images are loaded in batches from disk during the training process. The function generatebatchdata has already been described here. Note that this function is dividing 3D image pixel values by 8 to reduce the pixel value range to 0 to 15. You also find the function makecolormask used in generatebatchdata. To have only one input image container, the left and right image are stacked together to a single 128x128x6 image. More about it below.

def generatebatchdata(batchsize, inputpath):
  
    imagenames = os.listdir(os.path.join(path, inputpath, "3d"))

    shuffle(imagenames)

    while True:
        batchstart = 0
        batchend = batchsize    
        
        while batchstart < len(imagenames):
            
            imagelist = []
            masklist = []
            
            limit = min(batchend, len(imagenames))

            for i in range(batchstart, limit):
                if imagenames[i].endswith(".png"):
                    img0 = cv2.imread(os.path.join(path, inputpath, "right",imagenames[i]),cv2.IMREAD_COLOR )
                    img1 = cv2.imread(os.path.join(path, inputpath, "left",imagenames[i]),cv2.IMREAD_COLOR )
                    
                    img = np.zeros((dim[0], dim[1], img0.shape[2]+img1.shape[2]), 'uint8')
                    img[:,:,0:img0.shape[2]] = img0[:,:,:]
                    img[:,:,img0.shape[2]:img0.shape[2]+img1.shape[2]] = img1[:,:,:]
                    
                    imagelist.append(img)

                    mask = cv2.imread(os.path.join(path, inputpath, "3d",imagenames[i]),cv2.IMREAD_UNCHANGED )
                    mask //= 8
                    masklist.append(makecolormask(mask))


            train_data = np.array(imagelist, dtype=np.float32)
            train_mask= np.array(masklist, dtype=np.float32)

            train_data -= train_data.mean()
            train_data /= train_data.std()
            
            yield (train_data,train_mask)    

            batchstart += batchsize   
            batchend += batchsize

During training, the model needs to be fed with input images, which are left and right images captured by the CSI cameras. Left and right images have both the size 128x128x3 since they are RGB images. In order to feed both images in one shot into the model, we stack left and right images to 128x128x6 images. The first line of the code shows, that the model is configured with input size 128x128x6. The remaining code below, such as the callbacks, has already been described in previous posts.

input_img = Input((dim[0], dim[1], 6), name='img')
model = get_unet(input_img, n_filters=depth, dropout=0.0, batchnorm=True)
model.compile(optimizer=Adam(), loss="categorical_crossentropy", metrics=["accuracy"])

callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ModelCheckpoint(os.path.join(path, dirmodels,"model-chkpt.h5"), verbose=1, save_best_only=True, save_weights_only=True)
]

The program below instantiates two generators generator_train and generator_valid. We use the batch size of 20. The training is started with the fit_generator command (in keras, this function is deprecated by now, so you can just replace with the fit command).

batchsize = 20

generator_train = generatebatchdata(batchsize, "train")
generator_valid = generatebatchdata(batchsize, "valid")

model.fit_generator(generator_train,trainsize//batchsize, epochs=20, callbacks=callbacks, validation_data=generator_valid, validation_steps=validsize//batchsize)

On jupyter notebook we are able to execute the above portion of code several times in a row, so we ran the training with 80 epochs. At the end we had a validation accuracy of 82%. Considering that we have just 3000 training images (minus 20 percent validation images), the result might be just fine. We stopped the training after 80 epochs, because we were not able to see any improvement.

Testing on a Jetson Nano

We ran the training on a NVIDIA 2070 graphics card and saved the weights to a file with name modelweightname. For testing, we use the Jetson Nano on the same experimental setup, which has been described above. Below you see how the model can be loaded from a json file and how the weights can be loaded into the model.

json_file = open(os.path.join(path, models,modeljsonname), 'r')
loaded_model_json = json_file.read()
json_file.close()
model = model_from_json(loaded_model_json)
model.load_weights(os.path.join(path, models,modelweightname))

Essentially we use similar code as above to capture left and right images from the CSI cameras by using the VideoCapture class. The code below instantiates the objects outle, outri and out3d from the VideoWriter class to write videos from the captured images and from the predicted 3D image.

Within the while loop the program captures images from the left and right CSI cameras, extracts a square section and then resizes them. Finally they are stacked together, standardized and pushed into the trained model for prediction. The output of a prediction is a 128x128x16 image which is converted to a 3D image by using the numpy reshape/argmax functions. The left, right and 3D image are then written into the video streams.

cap0 = cv2.VideoCapture(gstreamer(sensor_id=0), cv2.CAP_GSTREAMER)
cap1 = cv2.VideoCapture(gstreamer(sensor_id=1), cv2.CAP_GSTREAMER)

out3d = cv2.VideoWriter(os.path.join(path,"videos",'video3d.mp4'),cv2.VideoWriter_fourcc(*'MP4V'),10, dim)  
outle = cv2.VideoWriter(os.path.join(path,"videos",'videole.mp4'),cv2.VideoWriter_fourcc(*'MP4V'), 10, dim)
outri = cv2.VideoWriter(os.path.join(path,"videos",'videori.mp4'),cv2.VideoWriter_fourcc(*'MP4V'), 10, dim) 

img0 = None
img1 = None
predictions_test = None

while(True):
    ret0, frame0 = cap0.read()
    ret1, frame1 = cap1.read()
 
    if ret0 == True and ret1 == True:


        cv2.imshow('left', frame0)
        cv2.imshow('right', frame1)
        
        img0 = np.array(frame1, dtype=np.uint8)
        img0 = img0[60:420,120:480,:]
        img0 = cv2.resize(img0, dim, interpolation = cv2.INTER_AREA)
        
        outri.write(img0)
        
        img1 = np.array(frame0, dtype=np.uint8)
        img1 = img1[60:420,120:480,:]
        img1 = cv2.resize(img1, dim, interpolation = cv2.INTER_AREA)
        
        outle.write(img1)
        
        img = np.zeros((dim[0], dim[1], img0.shape[2]+img1.shape[2]), 'uint8')            
        img[:,:,0:img0.shape[2]] = img0[:,:,:]
        img[:,:,img0.shape[2]:img0.shape[2]+img1.shape[2]] = img1[:,:,:]
        
        imagetestlist = []
        imagetestlist.append(img)
        test_data = np.array(imagetestlist, dtype=np.float32)
        test_data -= test_data.mean()
        test_data /= test_data.std()
        
        prediction = model.predict(test_data, batch_size=1, verbose=0)[0]
        
        mymask = prediction.reshape((dim[0], dim[1], depth)).argmax(axis=2)
        
        mymask *= 8
        mymask =  np.array(mymask, dtype=np.uint8)
        
        out3d.write(cv2.cvtColor(mymask,cv2.COLOR_GRAY2RGB))
        
        mask = np.zeros(((2*dim[0], 2*dim[1])), 'uint8')
        mask = cv2.resize(mymask, (2*dim[0], 2*dim[1]), interpolation = cv2.INTER_AREA)
        
        cv2.imshow('3d',mask)   
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        print("camera not workin")
        break

cap0.release()
cap1.release()
cv2.destroyAllWindows()
out3d.release()
outle.release()
outri.release()

Above we described a problem with unsynchronized capturing of images with the left and right CSI cameras. The code above does not show this problem we had before. We assume it is due to the very slow inference computation time for the 3D image prediction. Both cameras seem to catch up, because the real bottleneck here is the 3D image prediction.

Results

Picture 4 shows images from the left, right and 3D camera videos. Note that the left and right images are shifted, since both cameras are mounted 8cm apart. In the middle of Picture 4 you see the predicted 3D image. The hand on the right side of the 3D image seems to be cut off. This is probably due to the left camera, which did not capture the complete hand.

Picture 4: Close shot

In Picture 5 you see a person leaning against the wall. The wall pixels should have the values around 127. In the text above we described, that we furthest distance we work with is 2m, which is represented by the pixel value 127. In the middle of Picture 5 you see that the person is displayed slightly darker than the wall. This is expected. But the hand on the left side is actually to dark.

Picture 5: Far shot

In Picture 6 you see a person stretching out this hands towards the cameras. In the middle of Picture 6, the hands are displayed with dark pixels, while the body is displayed with lighter pixels. This is expected. On the right side in the 3D image, there is a section which has just been left out and shows wall distance pixels.

Picture 6: Combination of close shot and far shot

Conclusion and Outlook

The pictures in the result section show how 3D predictions are possible with a trained model. The results you find here have a qualitative nature; so we can recognize, that a person is close or far away from the camera. Until now, we have not made any quantitative tests, by determining a pixel value and relate it to a real distance of an object from the camera’s lens.

The 3D images are predicted by a trained model. We mentioned above, that we only used 3000 images and had a validation accuracy of 82%. The validation accuracy probably can be improved, if we use many more images for training.

The imperfect accuracy can be seen in the pictures of the result section. In one case, a hand was missing. In another case, a hand was displayed as if it is close to the camera, though it was leaning against the wall. We assume that these imperfections can be improved with more training data.

To our surprise the performance of the Jetson Nano computer is not too good with the model we use. We really did ensure that the NVIDIA chip was used during prediction. Despite the graphic card chip, the inference time to predict 3D image is too high. The display of the predicted 3D images is not in real time. Hence, you always see a delay of several seconds. For further improvement, we might need to take out a more layers from the model to reduce the calculations.

Another topic which should be investigated in more detail is, if a convolutional neural network is really suited for this kind of problem. We need to show, if the shift of the images between the left and right cameras do come in effect at all. Convolutional network only consider direct neighbor pixels (depending on its kernel size, which is often just 3×3), and not pixels which are further apart like in our case with the left and right picture. More investigation should be made to take this shift more into account, maybe with fully connected layers or with larger kernels.

Acknowledgement

Special thanks to the University of Applied Science Albstadt-Sigmaringen for providing the infrastructure and the appliances to enable this research.

Deep Reinforcement Learning with the Snake Game

here we want to show our achievements and fails with deep reinforcement learning. The paper [1] describes an algorithm for training atari games such as the game breakout. Standford university tought this algorithm in its deep learning class lecture 14 reinforcement learning. Since the results looked pretty promising we tried to recreate the methods with our own implementations. We decided to try it with another game which is the snake game. Here the player controls a snake with four action keys (RIGHT, LEFT, UP, DOWN) moving on the screen. The player has to control the snake in a way it eats food and he has to avoid the frame’s borders otherwise the snake dies, see Picture 1 how the snake is going to die. In case it runs out the frame’s border the game terminates. Each time the snake eats food, the body length of the snake is increased. During the game the player has also to avoid the snake’s body otherwise the game terminates, as well.

Picture 1: Snake Game: image of the game window

Picture 2 shows how the snake is moving from one position to the next towards the food (white box). The snake game we used has been created by Rajat Biswas and can be downloaded from here. However to access the images of the game and the action keys, we had to modify the code, which is copied into this post. Modifications had been very substantial so the code might not be recognizable anymore with exception of some variable names.

Picture 2: Three sequential images

Basic Setups

In the beginning we tried a frame size of 100×100 for each image. We soon figured out that the graphics card reached its capacity and the computer crashed. The authors of the paper [1] used frame sizes of 84×84, however we then tried with 50×50 because of our failing experiences. The objects of the game (snake, food) displayed in the images were still recognizable after resizing to 50×50, so we kept this frame size, see dim in the code below.

The snake in Rajat’s game was drawn in color green. Here is now the problem: Reinforcement Learning is based on the theory of Markov Decision Process (MDP). In MDP we are dealing with states, and the state tells you what action you going to do for the future. Only the current state is relevant, but not the past states. So if you take a look at the snake game, Picture 1, you find that we have drawn the snake’s head in color magenta (R: 255, G: 0, B:255). Now you always know as player where the head and where the tail of the snake is by just looking at one image. In the original game the creator colored the complete snake in green. In the atari game example [1] the authors faces the problem with not knowing from which direction the flying ball was coming. They solved it by capturing four sequential images and adding their grayscale images into the layers of a new image. Now they were able to see the dynamics of a flying ball.

A good lecture in Markov Decision Process can be found here. Actually it is very recommend to take a look into MDP theory before programming anything in reinforcement learning, otherwise it will be hard to understand the programming.

The code below shows some necessary directories, such as the data directory containing a large sample of images of the game (also called replay memory), the model directory, containing the model’s checkpoint with the name modelweightname.

Rajat’s game used the string identifiers RIGHT, LEFT, UP DOWN to indicate a keystroke. The dictionaries actionstonum and numtoactions convert the string identifiers to numbers, and vice versa.

dim = (50,50) 
pathname = r"/home/inf/Dokumente/Reinforcement"
datadirname = "data"
validdirname = "valid"
modeldirname = "model"
datacsvname = "data.csv"
modeljsonname="model-regr.json"
modelweightname="model-regr.h5"

actionstonum = {"RIGHT": 0,
           "LEFT": 1,
           "UP" : 2,
           "DOWN" : 3,
          }
numtoactions = {0: "RIGHT",
           1: "LEFT",
           2: "UP",
           3: "DOWN",
          }

The Model

The model comes originally from [1], however we copied it from the kera’s reinforcement website, because the author’s did not describe the model thoroughly enough to be able to reconstruct it. The function code create_q_model is copied in the code section below. It consists of five neural network layers. Three convolution layers and two fully connected layers. The last layer is a fully connected layer with four outputs. Each output represents one Q state of one action from the action space RIGHT, LEFT, UP DOWN.

A Q state is a variable, which tells us how much food the snake will eat in future, if a certain action is taken. Eating food will give in our programming the player a reward of one. An Example: we assume the Q state for action RIGHT is two. This means, if the player chooses for action RIGHT, the prediction is that the player eats food two more times.

def create_q_model():

    inputs = layers.Input(shape=(dim[0], dim[1], 3,))

    layer1 = layers.Conv2D(32, 8, strides=4, activation="relu")(inputs)
    layer2 = layers.Conv2D(64, 4, strides=2, activation="relu")(layer1)
    layer3 = layers.Conv2D(64, 3, strides=1, activation="relu")(layer2)

    layer4 = layers.Flatten()(layer3)

    layer5 = layers.Dense(512, activation="relu")(layer4)
    action = layers.Dense(4, activation="linear")(layer5)

    return keras.Model(inputs=inputs, outputs=action)

Breaking down the code

We have written the main code in class Game, where we incorporated Rajat’s code. By now his code is probably very hard to recognize. The class Game became pretty huge, so we need to break it down. Below you see the class definition, the constructor __init__ and the method initialize.

The constructor sets the frame size of the game to 200×200. Later, frame images are resized to 50×50. The game needs four colors: back, white, green and magenta. the variables imgresh1 and imgresh2 are two sequential images of the game, e.g. two images in Picture 2. The first image is taken before the player takes an actions and the second image is taken after the player takes action. Basically these two images represent the current state and the future state. The constants MAXREWARD is assigned to reward the player as soon as the snake eats food. The program assigns PENALTY to penalize the player when the snake hits the border or its own body. If an action just moves the snake, then MOVEPENALTY is assigned to reward. We actually have set this to zero, so it has no effect.
We have set the BATCHSIZE of training data to 20. Below it shows actually the number 19, but a one data is added during training which makes it to 20.
DISCOUNT, ALPHA and EPSILON are values of the Bellman equation. This is part of the MDP theory, so we skip to explain it. The MDP lecture is really recommended to study here. REPLAYSIZE is also explained in [1], so we took the concept from there. It is a maximum number for storing current and future images in a replay memory. So the code here stores 40.000 current and future images on disk and uses the images to retrain the model. Two models are created with the create_q_model function: model and model_target. model is the current model, and model_target is the future model. The current model is retrained all the time during the game, and the future model is updated once after playing numerous games. We use the Adam optimizer and the loss function Huber. Huber is a combination of a square function and a absolute function. It punishes huge differences less than a square function. The remaining important code in the constructor is the loading the weights to model_target.

The method initialize initializes the pygame environment, which is part of the library we used in the code. Here the clock is set, and the frame size is set to 200×200. The first positions of the snake (snake_pos) and the food (food_pos) are randomly set. The variable changeto holds the current action (RIGHT, LEFT, UP DOWN) hit by the player. The variable direction holds mostly the same value as changeto. However, in some cases it does not. E.g. if the snake runs right, the player hits left, so the snake continues to run right. This means that changeto holds now the value left, but direction the value right. The method initialize completes with loading the weights into the current model model.

class Game:

    def __init__(self, lr=1e-3, checkpointparname=modelweightname):
        
        self.speed = 80
        self.frame_size_x = 200
        self.frame_size_y = 200

        self.black = pygame.Color(0, 0, 0)
        self.white = pygame.Color(255, 255, 255)
        self.green = pygame.Color(0, 255, 0)
        self.mag = pygame.Color(255, 0, 255)
        
        self.imgresh1 = None
        self.imgresh2 = None
        
        self.reward = 0
        self.MAXREWARD = 1.0
        self.PENALTY = -1.0
        self.MOVEPENALTY = 0.0
        
        self.BATCHSIZE = 19

        self.DISCOUNT = 0.99
        self.ALPHA = 0.3
        if manual == True:
            self.EPSILON = 0.999
        else:
            self.EPSILON = 0.3
        
        self.REPLAYSIZE = 40_000
        self.overall_score = 0
        self.overall_numbatches = 0

        self.model = create_q_model()
        self.model_target = create_q_model()

        self.learningrate = lr
        self.optimizer = keras.optimizers.Adam(learning_rate=self.learningrate, clipnorm=1.0)
        self.loss_function = keras.losses.Huber()

        self.checkpointname = os.path.join(pathname, modeldirname,checkpointparname)
        print(f"loading checkpoint: {self.checkpointname}")
        self.model_target.load_weights(self.checkpointname)
        
        self.overall_scores=[]
        self.checkpoint_counter=0
        
        self.shufflelist = []
        
    def initialize(self, i, j):

        status = pygame.init()

        if status[1] > 0:
            print(f'Number of Errors: {status[1]} ...')
            sys.exit(-1)

        pygame.display.set_caption(f"{i}-{j}")
        self.game_window = pygame.display.set_mode((self.frame_size_x, self.frame_size_y)) 

        self.controller = pygame.time.Clock()
   
        posx = (random.randint(40,160)//10)*10
        posy = (random.randint(40,160)//10)*10
           
        self.snake_pos = [posx, posy]
        self.snake_body = [[posx, posy], [posx-10, posy], [posx-(2*10), posy]]

        self.food_pos = [random.randrange(1, (self.frame_size_x//10)) * 10, random.randrange(1, (self.frame_size_y//10)) * 10]
        self.food_spawn = True

        self.direction = 'RIGHT'
        self.changeto = self.direction

        self.score = 0
        self.numbatches = 0

        self.event_happened = False
        
        self.model.load_weights(self.checkpointname)

The code below shows the methods run, get_maxi, game_over and get_direction of the class Game.

The method run executes one iteration of the snake game. This means run draws the current image, checks for a key stroke, updates the graphical objects such as snake head, snake body and food and draws the next image. While executing run, the images are saved to disk with a unique number for identification. To prevent overwriting images, the method get_maxi reads the maximum identification number and sets a counter i to it.
In the beginning the method run saves the first image of the game window to the opencv image imgresh1. We use the numpy method frombuffer and reshape for retrieving the image and the opencv method resize to lower the image size to 50×50. The image imgresh1 is then moved into the neural network to get a prediction of the four Q states with model. The maximum of the four Q states (tensorflow method argmax) gives you the predicted action. It must be a number between zero and three. This is also part of the Bellman equation from the MDP theory.
The run method reads the key strokes and moves the information into the attribute changeto. With the help of the constant EPSILON, we bring some variation between prediction, randomness or direction into the game. If the constant EPSILON is close to one, then the run method is rather on the prediction side (Exploitation). If the constant EPSILON is close to zero, then the method run is rather of the randomness/direction side (Exploration). Direction is given by the method get_direction. Again, the constant EPSILON is also part of the MDP theory.
As mentioned above, changeto is an attribute telling you in which direction the player wants to move the snake. However if e.g. the snake moves right, and player strikes the left key, then the snake should still move right. For this we need the attribute direction. It tells the actual direction of the snake independent of the key stroke.
The method run is then checking if the snake has eaten the food. If yes, a new food position is drawn and the attribute reward is set to MAXREWARD and the body of the snake is enlarged. If no food or no border has been hit, than the attribute reward is set to MOVEPENALTY. In case the snake hits the border or hits its own body, then the attribute reward is set to PENALTY. This is done in the method game_over.
Finally the snake is redrawn and an image of the game window is moved into imgresh2. The images imgresh1 and imgresh2 form the current state and the future state.
At the end, the method run retrains the model with the method train. We are doing this only with every fourth current state and future state. We are doing this to emphasize more the scoring and the terminations and to de-emphasize the moving.

If the snake hits the border, the method game_over is executed and it terminates the method run. Here the model is retrained as well with the current state (imgresh1), future state (imgresh2), action taken (changeto), reward (reward) and the termination information.

class Game:

    # constructur and initializer, see above
      
    def run(self, i_index):
        
        i = i_index + self.get_maxi() + 1
        j = 0

        while True:
            
            img1 = np.frombuffer(pygame.image.tostring(self.game_window, "RGB"), dtype=np.uint8)
            self.imgresh1 = np.reshape(img1,(self.frame_size_x,self.frame_size_y, 3))
            self.imgresh1 = cv2.resize(self.imgresh1, dim, interpolation = cv2.INTER_NEAREST )

            current_state = np.array(self.imgresh1, dtype=np.float32)/255.0
            state_tensor = tf.convert_to_tensor(current_state)
            state_tensor = tf.expand_dims(state_tensor, 0)
            action_probs = self.model(state_tensor, training=False)
            theaction = tf.argmax(action_probs[0]).numpy()
            
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    return
                # Whenever a key is pressed down
                elif event.type == pygame.KEYDOWN:

                    if event.key == pygame.K_UP or event.key == ord('w'):
                        self.changeto = 'UP'
                    if event.key == pygame.K_DOWN or event.key == ord('s'):
                        self.changeto = 'DOWN'
                    if event.key == pygame.K_LEFT or event.key == ord('a'):
                        self.changeto = 'LEFT'
                    if event.key == pygame.K_RIGHT or event.key == ord('d'):
                        self.changeto = 'RIGHT'
                    
                    # Esc -> Create event to quit the game
                    if event.key == pygame.K_ESCAPE:
                        pygame.event.post(pygame.event.Event(pygame.QUIT))


            if np.random.random() > self.EPSILON:
                self.changeto = numtoactions[theaction]
            else:
                if manual != True:
                    #self.changeto = numtoactions[np.random.randint(0, len(actionstonum))]
                    self.changeto = self.get_direction();
                    
            if self.changeto == 'UP' and self.direction != 'DOWN':
                self.direction = 'UP'
            if self.changeto == 'DOWN' and self.direction != 'UP':
                self.direction = 'DOWN'
            if self.changeto == 'LEFT' and self.direction != 'RIGHT':
                self.direction = 'LEFT'
            if self.changeto == 'RIGHT' and self.direction != 'LEFT':
                self.direction = 'RIGHT'

            if self.direction == 'UP':
                self.snake_pos[1] -= 10
            if self.direction == 'DOWN':
                self.snake_pos[1] += 10
            if self.direction == 'LEFT':
                self.snake_pos[0] -= 10
            if self.direction == 'RIGHT':
                self.snake_pos[0] += 10

            self.snake_body.insert(0, list(self.snake_pos))
            if self.snake_pos[0] == self.food_pos[0] and self.snake_pos[1] == self.food_pos[1]:
                self.score += 1
                self.reward = self.MAXREWARD
                self.food_spawn = False
            else:
                self.snake_body.pop()
                self.reward = self.MOVEPENALTY

            if not self.food_spawn:
                self.food_pos = [random.randrange(1, (self.frame_size_x//10)) * 10, random.randrange(1, (self.frame_size_y//10)) * 10]
            self.food_spawn = True

            self.game_window.fill(self.black)
            n = 0
            for pos in self.snake_body:

                if n == 0:
                    pygame.draw.rect(self.game_window, self.mag, pygame.Rect(pos[0], pos[1], 10, 10))
                else:
                    pygame.draw.rect(self.game_window, self.green, pygame.Rect(pos[0], pos[1], 10, 10))
                n=+1
                

            pygame.draw.rect(self.game_window, self.white, pygame.Rect(self.food_pos[0], self.food_pos[1], 10, 10))

            if self.snake_pos[0] < 0 or self.snake_pos[0] > self.frame_size_x-10:
                self.game_over(i,j)
                return
            if self.snake_pos[1] < 0 or self.snake_pos[1] > self.frame_size_y-10:
                self.game_over(i,j)
                return

            for block in self.snake_body[1:]:
                if self.snake_pos[0] == block[0] and self.snake_pos[1] == block[1]:
                    self.game_over(i,j)
                    return

            pygame.display.update()

            img2 = np.frombuffer(pygame.image.tostring(self.game_window, "RGB"), dtype=np.uint8)
            self.imgresh2 = np.reshape(img2,(self.frame_size_x,self.frame_size_y, 3))
            self.imgresh2 = cv2.resize(self.imgresh2, dim, interpolation = cv2.INTER_NEAREST )
            
            self.controller.tick(self.speed)

            if j > 0:
                if self.reward == self.MAXREWARD:
                    self.train(i,j, False)
                elif j%4 == 0:
                    self.train(i,j, False)
      
            j += 1
                  
    def game_over(self,i,j):
        self.reward = self.PENALTY

        img2 = np.frombuffer(pygame.image.tostring(self.game_window, "RGB"), dtype=np.uint8)
        self.imgresh2 = np.reshape(img2,(self.frame_size_x,self.frame_size_y, 3))
        self.imgresh2 = cv2.resize(self.imgresh2, dim, interpolation = cv2.INTER_NEAREST )
        
        self.train(i,j, True)
        
        self.overall_score += self.score
        
        self.game_window.fill(self.black)
        pygame.display.flip()                         
        pygame.quit()

    def get_maxi(self):
        
        maxi = 0
        
        for item in self.shufflelist:
            curr = item[0]
            s = re.findall(r'\d+', curr)[0]
            if int(s) > maxi:
                maxi = int(s)
        
        return maxi

    def get_direction(self):

        x = self.snake_pos[0] - self.food_pos[0]
        x1 = self.snake_body[1][0] - self.food_pos[0]
        
        y = self.snake_pos[1] - self.food_pos[1]
        y1 = self.snake_body[1][1] - self.food_pos[1]
        

        direction = None
        direction_h = None
        direction_v = None

        if x > 0:
            direction_h = 'LEFT'
        else:
            direction_h = 'RIGHT'

        if y > 0:
            direction_v = 'UP'
        else:
            direction_v = 'DOWN'
                           

        if abs(x) > abs(y):
            direction = direction_h
            
            if y == y1 and (abs(x) > abs(x1)):
                #print(f"  hit v x: {abs(x)} x1: {abs(x1)} y: {y} y1: {y1}")
                direction = direction_v
        else:
            direction = direction_v
            if x == x1 and (abs(y) > abs(y1)):
                #print(f"  hit h x: {abs(y)} x1: {abs(y1)} y: {x} y1: {x1}")
                direction = direction_h
            
        return direction

The code below shows again the class Game, however without the methods run, game_over, get_direction and get_maxi, which were described above.

In the code below we implemented the methods from the description in [1] and from the deep reinforcement theory. In [1] the authors suggested a replay memory to avoid similarities of current states within a batch of training data. Their argument was, that this can cause feedback loops which are bad for convergence.
Game’s method load_replay_memory loads in all current states (currentpicname), actions (action), future states (nextpicname) and termination information into the list shufflelist. The list shufflelist is then randomly reordered. It represents now the replay memory.
The method save_replay_memory saves a pandas datasheet (datacsvname) to disk with all information to reload shufflelist at another time again.
The method pop_batch picks the first set of entries from the replay memory. Each entry of the list shufflelist holds the current state and future states as files names. The method pop_batch loads in both image (img1, img2) and adds them to a batch list which is finally returned. The entries of the list are used for training the neural network model. The method push_batch is doing the opposite. It adds a batch list to the end of the list shufflelist.

We are now moving to the next method get_X. It prepares the batch list for training the neural network. The images (current state indicated by 0 and future state indicated by 3) in the batch list are converted to a numpy array and normalized with division by 255.0.

The method backprop does the training on the neural network. Its code was basically taken from the keras site. It also shows the Bellman equation which is handled in the deep reinforcement learning theory. In short, it predicts the future rewards with the model_target (future_rewards) with the future states (Xf). It uses the Bellman equation to calculate the new Q states (updated_q_values) to be trained into model. Finally it calculates the loss value (loss) which is needed to train the model with the current batch list (X).

The Game‘s method train is simply a wrapper. It pulls with method pop_batch a batch list. The current and future images (imgresh1, imgresh2) are saved to disk (write) and added to the batch list, so the batch list size is increased by one (so the batch list size is not 19 but 20 now). Then it trains the model with the method backprop, and pushes back the batch list to the shufflelist.

class Game:
    # constructur and initialize, see above
    # run, game_over, get_direction, get_maxi, see above 

    def load_replay_memory(self):

        f = open(os.path.join(os.path.join(pathname, datadirname, datacsvname)), "r")
        
        df = pd.read_csv(f, index_col = 0) 

        for index, row in df.iterrows():

            currentpicname = row["currentstate"]
            action = actionstonum[row["action"]]
            reward = row["reward"]
            nextpicname = row["nextstate"]
            terminated = row["terminated"]
            sxpos = row["sxpos"]
            sypos = row["sypos"]
            fxpos = row["fxpos"]
            fypos = row["fypos"]
            
            self.shufflelist.append([currentpicname,action,reward,nextpicname, terminated, sxpos, sypos, fxpos, fypos])

        random.shuffle(self.shufflelist)
    
        f.close()
        
        return

    def save_replay_memory(self):
           
        data = []
        
        if len(self.shufflelist) == 0:
            return
        
        if len(self.shufflelist) > self.REPLAYSIZE:
            
            self.numbatches = len(self.shufflelist) - self.REPLAYSIZE
            self.overall_numbatches += self.numbatches
            
            for i in range(len(self.shufflelist) - self.REPLAYSIZE):
                item = self.shufflelist.pop(0)
                os.remove(os.path.join(self.path,item[0]))
                os.remove(os.path.join(self.path,item[3]))
                
        for (cs, act, rew, fs, term, sx, sy, fx, fy) in self.shufflelist:
            
            data.append({'currentstate': cs, 'action': numtoactions[act], 'reward': rew, 'nextstate': fs, 'terminated': term, 'sxpos': sx, 'sypos': sy, 'fxpos': fx, 'fypos': fy})
            
        df = pd.DataFrame(data) 
        
        df.to_csv(os.path.join(self.path, datacsvname)) 
        
        return
    
    
    def pop_batch(self, batchsize):
       
        batch = []
        files = []
    
        for i in range(batchsize):
            
            item = self.shufflelist.pop(0)
            
            img1 = cv2.imread(os.path.join(self.path, item[0]),cv2.IMREAD_COLOR )
            img2 = cv2.imread(os.path.join(self.path, item[3]),cv2.IMREAD_COLOR )

            batch.append([img1, item[1], item[2], img2, item[4], item[5], item[6], item[7], item[8]])
            files.append((item[0],item[3]))

        return batch, files

    def push_batch(self, batch, files):
       
        for index,item in enumerate(batch):

            self.shufflelist.append([files[index][0], item[1], item[2], files[index][1], item[4], item[5], item[6], item[7], item[8]])
    
        return
    
    def get_X(self, batch, state):

        assert state == 0 or state == 3 # 0 is currentstate, 3 is future state
        
        X = [item[state] for item in batch]
        X = np.array(X, dtype=np.float32)
        X /= 255.0
        
        return X
    

    def backprop(self, batch):

        rewards_sample = [batch[i][2] for i in range(len(batch))]
        action_sample = [batch[i][1] for i in range(len(batch))]
        done_sample = tf.convert_to_tensor([float(batch[i][4]) for i in range(len(batch))])

        X =  self.get_X(batch, 0)
        Xf = self.get_X(batch, 3)
        future_rewards = self.model_target.predict(Xf)

        updated_q_values = rewards_sample + 0.99 * tf.reduce_max(future_rewards, axis=1)
        updated_q_values = updated_q_values * (1 - done_sample) - done_sample*abs(self.PENALTY)

    
        masks = tf.one_hot(action_sample, 4)

        with tf.GradientTape() as tape:

            q_values = self.model(X)

            q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)

            loss = self.loss_function(updated_q_values, q_action)
            
        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
        
    def train(self, i, j, term):
        
        # https://pythonprogramming.net/training-deep-q-learning-dqn-reinforcement-learning-python-tutorial/
        
        currentstate = "current_{}_{}.png".format(i,j)

        nextstate = "next_{}_{}.png".format(i,j)      
        
        batch, files = self.pop_batch(self.BATCHSIZE)
            
        batch.append([self.imgresh1, actionstonum[self.changeto], self.reward, self.imgresh2, term, self.snake_pos[0], self.snake_pos[1], self.food_pos[0], self.food_pos[1]])
        files.append(("current_{}_{}.png".format(i,j), "next_{}_{}.png".format(i,j)))
        
        self.write(i,j)
         
        self.backprop(batch)
        
        self.numbatches += 1
            
        self.push_batch(batch, files)   
  
        return
    

The remaining methods of Game are utility methods. The first utility method is print_benchmark. After training models, we needed some kind of indication if the training went into a positive direction or into a negative direction. With the method print_benchmark we get this indication. In the method print_benchmark we first create two lists: maxlist and penaltylist. The list maxlist contains all states where the snake ate food. The list penaltylist contains all states where the snake went into the window frame border or into its own body. Secondly we are testing the trained model with the content of maxlist and penaltylist and check if the predictions are correct. In case of incorrect predictions a statistical value pmerror is updated, which indicates how often the model predicts a move of the snake towards the food position wrongly. So ideally this value must be zero. The value pterror indicates how often the snake will go into a termination state. This number should be ideally zero as well. Other benchmarks are the averaged Q states. These are very important to consider, because they should not explode and converge to the same number.

The methods print_score and print_overall_score print out the score of the current game or the score of a large set of games. Scores can also be used as benchmarks.

After all, we have the method run_replay_memory. It loads in the complete replay data into the list shufflelist and runs the method backprop on the content of shufflelist. Here is no need to use method run to train the model. However we still need the method run to generate more training data and refresh the replay memory.

def class Game:
    # constructur and initializer, see above
    # run, game_over, get_direction, get_maxi, see above 
    # load_replay_memory, save_replay_memory, pop_batch, push_batch, get_X, backprop, train, see above
  
    def print_benchmark(self):

        maxlist = []
        penaltylist = []
        averagestates = [0,0,0,0]
        averagepenalty = [0,0,0,0]
        pmerror = 0
        pterror = 0

        for (cs, act, rew, fs, term, sx, sy, fx, fy) in self.shufflelist:
            if rew == self.MAXREWARD or rew == 30.0:
                maxlist.append((cs,act,rew,fs,term))
            if rew == self.PENALTY:
                penaltylist.append((cs,act,rew,fs,term))
        print(f"Number of maxrewards in shufflelist: {len(maxlist)}, perc: {100*len(maxlist)/len(self.shufflelist)}")
        print(f"Number of terminations in shufflelist: {len(penaltylist)}, perc: {100*len(penaltylist)/len(self.shufflelist)}")
        
        count = 0
        
        print("Testing maxlist")
        for i in range(len(maxlist)):
            img = cv2.imread(os.path.join(pathname, datadirname, maxlist[i][0]),cv2.IMREAD_COLOR )
            states = self.model.predict(np.array([img])/255.0, batch_size=1, verbose=0)[0]
            averagestates += states
            if np.argmax(states) != maxlist[i][1]:
                count += 1
            pmerror = 100*count/len(maxlist)
        print(f"Number of predicted errors in maxlist: {count}, perc: {pmerror}")
        print(f"Q Values for max: {averagestates/len(maxlist)}")
        
        count = 0
        
        print("Testing penaltylist") 
        for i in range(len(penaltylist)):
            img = cv2.imread(os.path.join(pathname, datadirname, penaltylist[i][0]),cv2.IMREAD_COLOR )
            states = self.model.predict(np.array([img])/255.0, batch_size=1, verbose=0)[0]
            averagepenalty += states
            if np.argmax(states) == penaltylist[i][1]:
                count += 1
            pterror = 100*count/len(penaltylist)
        print(f"Number of predicted terminations in penaltylist: {count}, perc: {pterror}")
        print(f"Q Values for penalty: {averagepenalty/len(penaltylist)}")
        
        return pmerror, averagestates/len(maxlist), averagepenalty/len(penaltylist)
    
    def print_score(self):
        print(f" ----> TIME IS {datetime.now():%Y-%m-%d_%H-%M-%S}")
        print(f" ----> SCORE is {self.score}")
        print(f" ----> NUM OF BATCHES is {self.numbatches}")
        return self.score, self.numbatches
    
    def print_overall_score(self):
        print(f"--> TIME IS {datetime.now():%Y-%m-%d_%H-%M-%S}")
        print(f"--> OVERALL SCORE is {self.overall_score}")
        print(f"--> OVERALL NUM OF BATCHES is {self.overall_numbatches}")
        return self.overall_score, self.overall_numbatches     
    
    def run_replay_memory(self, epochs = 5):
        self.model.load_weights(self.checkpointname)
        self.load_replay_memory()
        for j in range(epochs):
            
            for i in range(int(len(self.shufflelist)//(self.BATCHSIZE+1))):
                if i%500 == 0:
                    print(i)
                batch, files = self.pop_batch(self.BATCHSIZE+1)
                self.backprop(batch)
                self.push_batch(batch,files)

            self.print_benchmark()
            self.save_checkpoint()

The class Game has been need sufficiently explained now. Below the code to execute instances of Game to generate training data and to train the model. In the code below the function run_game instantiates Game several times (range(iterations)) within a loop with a given learning rate as a parameter. In a underlying loop it initializes and runs instances of Game with initialize and run and prints the benchmark with print_benchmark. In case there is an improvement concerning one of the benchmarks, a checkpoint of the model is saved. During the iterations the list shufflelist is saved back to disk with the method save_replay_memory.

def run_game(learning_rate = 1.5e-06, epochs = 5, benchmin = 68.0):
    manual = False
    lr = [learning_rate for i in range(epochs)]

    iterations = len(lr)
    benches = []
    qms = []
    qps = []
    counter = 0

    for i in range(iterations):
        print(f"{i}: learning rate: {lr[i]}")
        game = Game(lr[i], "model-regr.h5")
        k = 150
        game.load_replay_memory()
        for j in range(k):
            game.initialize(i, j)
            game.run(j)
        bench, qm, qp = game.print_benchmark()
        benches.append(bench)
        qms.append(qm)
        qps.append(qp)
        game.save_replay_memory()
        game.save_checkpoint(f"model-regr_{i}_{lr[i]:.9f}_{bench:.2f}.h5")
        if bench < benchmin:
            benchmin = bench
            game.save_checkpoint()
        else:
            counter += 1
        if counter == 3:
            counter = 0
            lr *= 0.5 
            
        overallscore = game.print_overall_score()
        overallscores.append(overallscore)
    return benches, qms, qps
     

Executing the code

The function run_game can be executed like shown in the code below. Parameters are learning rate, epochs and benchmark threshold.

run_game(1.5e-06, 5, 60.0)

To run training on the complete replay memory we execute the code below. The first parameter of the Game class is the learning rate, the second parameter is the name of the checkpoint with the neural networks weights to be updated. The method run_replay_memory trains the neural network with a number of epochs. In this case it is five.

game = Game(6.0e-07, "model-regr.h5")
game.run_replay_memory(5)

The function run_game and the method run_replay_memory both execute print_benchmark. It prints out an indication how successful the training was. Below you find an output of print_benchmark. It shows how many times the snake eats food (maxrewards) and how many terminations we find in the replay memory . The lists maxlist and penaltylist (see description of print_benchmark) are used to indicate how well the model predicts the eating of food and terminations. In the example below you find that the eating of food are 51% of all cases badly predicted and terminations are 41% of all cases badly predicted. The print below shows averaged values of the Q states for each action (RIGHT, LEFT, UP, DOWN). Ideally they should contain numbers in a close range. You can interpret a Q state in such a way: If you take an action RIGHT, the reward will be in future 0.51453086. This number should go up while training, but also as its limits. The score of the snake game will always be finite, because the snake grows larger each time it eats food. The limit is indirectly given by the size of the frame window. Important to know is, that the Q states of each action must be on average the same number, because statistically you must always get the same reward if the snake goes in either direction.

Number of maxrewards in shufflelist: 2783, perc: 6.9575 Number of terminations in shufflelist: 2042, perc: 5.105 Testing maxlist Number of predicted errors in maxlist: 1437, perc: 51.63492633848365 Q Values for max: [0.51453086 0.5192453  0.50427304 0.48402   ] Testing penaltylist Number of predicted terminations in penaltylist: 856, perc: 41.9196865817 Q Values for penalty: [0.21559233 0.16576494 0.22125778 0.210446  ] saving checkpoint: /home/inf/Dokumente/Reinforcement/model/model-regr.h5

Conclusion

We have run the code for days (in specific run_game and run_replay_memory) and tried with modifying the hyper parameters. During the run we produced millions of frames. In general we set the EPSILON value first to 0.3. This means that about 30% of the moves went to the direction of the food with the help of Game‘s method get_direction. The remaining moves were predicted by the model.

We were able to see that all average Q state values in maxlist of printed out by print_benchmark went above 1.6. This matches our observations that the number of scores went to around 270 after playing 150 games.

After running the game further we set the EPSILON value to 0.2, so more predictions were made by the game. Now the overall score went down. This time we made 100 scores after playing 150 games. You were able to see how the average Q state values went down after some time.

On the keras site, there is a note, that it takes 50 million frames to receive very good results. However at some point of time we stopped experimenting anything further. We were seeing that progress is extremely slow or none existent. We are inclined to say, that other methods but deep reinforcement learning are more promising. Despite the simpleness of the snake game, which is comparable to atari games, we were a little discouraged after reading [2].

Outlook

Applied research must give a solution for problems for a programmer who wants to develop an application. So the programmer must get some benefit from it. However we find that there are too many hyper parameters to turn (size of replay memory, learning rate, batch size, EPSILON value, DISCOUNT value, dimension of status images, the list does not end here) and progress is still extremely slow or not existent. A programmer wants to have results within a defined period of time. We have the feeling we do not have the development time under control with deep reinforcement learning.

Maybe there was just not enough training time for experimentation. Maybe the model with its five layers is not appropriate and needs fine tuning. Anyway, we think that other ways of machine learning (supervised learning) are much more successful than deep reinforcement learning.

We are thinking about estimating the head position and food position with a trained neural network by giving an image of the window frame. Then we calculate the moving direction for the snake. In the code above we actually record the head and food position and save it into the replay memory’s csv file for each current state and future state. So this information comes for free. Small experiments showed that the head and food position can be predicted very well with the same neural network model (instead of Q states we estimate positions of snake head and food).

Anyway, we have not completely given it up, but we just put a break in here. So there might come more in future.

Acknowledgement

Special thanks to the University of Applied Science Albstadt-Sigmaringen for providing the infrastructure and the appliances to enable this research

References

[1]: Playing Atari with Deep Reinforcement Learning, Volodymyr Mnih et. al.

[2]: Deep Reinforcement Learning Doesn’t Work Yet

[3]: Deep Q-Learning for Atari Breakout

[4]: Snake Game.py

Street Scene Segmentation with a UNET-Model

For the summer semester 2020 we offered an elective class called Design Cyber Physical Systems. This time only few students enrolled into the class. The advantage of having small classes is that we can focus on a certain topic without loosing the overview during project execution. The class starts with a brain storming session to choose a topic to be worked on during the semester. There are only a few constraints which we ask to apply. The topic must have something to do with image processing, neural networks and deep learning. The students came up with five ideas and presented them to the instructor. The instructor commented on the ideas and evaluated them. Finally the students chose one of the idea to work on until the end of the semester.

This time the students chose to process street scenes taken from videos while driving a car. A street scene can be seen in Figure 1. The videos were taken in driving direction through the front window of a car. The assignment the students have given to themselves is to extract street, street marking, traffic signs, and cars on the video images. This kind of problem is called semantic segmentation, which has been solved in a similar fashion as in the previous post. Therefore many functions in this post and in the last post are pretty similar.

Figure 1: Street scene

Creating Training Data

The students took about ten videos while driving a car to create the training data. Since videos are just a sequence of images, they selected randomly around 250 images from the videos. Above we mentioned that we wanted to extract street, street marking, traffic signs and cars. These are four categories. There is another one, which is neither of those (or the background category). This makes it five categories. Below you find code of python dictionaries. The first is the dictionary classes which assigns text to the category numbers (such as the number 1 is assigned to Strasse (Street in English). The dictionary categories assigns it the way around. Finally the dictionary colors assigns a color to each category number.

classes = {"Strasse" : 1, 
           "strasse" : 1,
          "Streifen" : 2,
           "streifen" : 2,
          "Auto" : 3, 
          "auto" : 3,
          "Schild" : 4,
           "schild" : 4,
         }

categories = {1: "Strasse", 
              2: "Streifen",
              3: "Auto", 
              4: "Schild",           
         }

colors = {0 : (0,0,0), 
          1 : (0,0,255), 
          2 : (0,255,0),
          3 : (255,0,0), 
          4 : (0,255,255),         
         }

dim = (256, 256) 

To create training data we decided to use the tool labelme, which is described here. You can load images into the tool, and mark regions by drawing polygons around them, see Figure 2. The regions can be categorized with a names which are the same as from the python dictionary categories, see code above. You can save the polygons, its categories and the image itself to a json file. Now it is very easy to parse the json file with a python parser.

Figure 2: Tool labelme

The image itself is stored in the json file in a base-64 representation. Here python has tools as well, to parse out the image. Videos generally do not store square images, so we need to convert them into a square image, which is better for training the neural network. We have written the following two functions to fulfill this task: makesquare2 and makesquare3. The difference of these functions is, that the first handles grayscale images and the second RGB images. Both functions just omits evenly the left and the right portion of the original video image to create a square image.

def makesquare2(img):
    
    assert(img.ndim == 2) 
    
    edge = min(img.shape[0],img.shape[1])
        
    img_sq = np.zeros((edge, edge), 'uint8')
    
    if(edge == img.shape[0]):
        img_sq[:,:] = img[:,int((img.shape[1] - edge)/2):int((img.shape[1] - edge)/2)+edge]
    else:
        img_sq[:,:,:] = img[int((img.shape[0] - edge)/2):int((img.shape[0] - edge)/2)+edge,:]

    assert(img_sq.shape[0] == edge and img_sq.shape[1] == edge)
    
    return img_sq
def makesquare3(img):
    
    assert(img.ndim == 3)
    
    edge = min(img.shape[0],img.shape[1])
        
    img_sq = np.zeros((edge, edge, 3), 'uint8')
    
    if(edge == img.shape[0]):
        img_sq[:,:,:] = img[:,int((img.shape[1] - edge)/2):int((img.shape[1] - edge)/2)+edge,:]
    else:
        img_sq[:,:,:] = img[int((img.shape[0] - edge)/2):int((img.shape[0] - edge)/2)+edge,:,:]

    assert(img_sq.shape[0] == edge and img_sq.shape[1] == edge)
    
    return img_sq

Below you see two functions createMasks and createMasksAugmented. We need the functions to create training data from the json files, such as original images and mask images. Both functions have been described in the post before. Therefore we dig only into the differences.

In both functions you find a special handling for the category Strasse (Street in English). You can find it below the code snippet:

classes[shape[‘label’]] != classes[‘Strasse’]

Regions in the images, which are marked as Strasse (Street in English) can share the same region as Streifen (Street Marking in English) or Auto (Car in English). We do not want that street regions overrule the street marking or car regions, otherwise the street marking or the cars will disappear. For this we create first a separate mask mask_strasse. All other category masks are substracted from mask_strasse, and then mask_strasse is added to the finalmask. Again, we just want to make sure that the street marking mask and the other masks are not overwritten by the street mask.

def createMasks(sourcejsonsdir, destimagesdir, destmasksdir):

    assocf = open(os.path.join(path,"assoc_orig.txt"), "w")
    count = 0
    directory = sourcejsonsdir
    for filename in os.listdir(directory):
        if filename.endswith(".json"):
            print("{}:{}".format(count,os.path.join(directory, filename)))
            
            f = open(os.path.join(directory, filename))
            data = json.load(f)
            img_arr = data['imageData']  
            imgdata = base64.b64decode(img_arr)

            img = cv2.imdecode(np.frombuffer(imgdata, dtype=np.uint8), flags=cv2.IMREAD_COLOR)
            
            assert (img.shape[0] > dim[0])
            assert (img.shape[1] > dim[1])
           
            finalmask = np.zeros((img.shape[0], img.shape[1]), 'uint8')
        
            masks=[]
            masks_strassen=[]
            mask_strasse = np.zeros((img.shape[0], img.shape[1]), 'uint8')
            
            for shape in data['shapes']:
                assert(shape['label'] in classes)

                vertices = np.array([[point[1],point[0]] for point in shape['points']])
                vertices = vertices.astype(int)

                rr, cc = polygon(vertices[:,0], vertices[:,1], img.shape)
                mask_orig = np.zeros((img.shape[0], img.shape[1]), 'uint8')

                mask_orig[rr,cc] = classes[shape['label']]
                if classes[shape['label']] != classes['Strasse']:
                    masks.append(mask_orig)
                else:
                    masks_strassen.append(mask_orig)
                    
            for m in masks_strassen:
                mask_strasse += m
                    
            for m in masks:
                _,mthresh = cv2.threshold(m,0,255,cv2.THRESH_BINARY_INV)
                finalmask = cv2.bitwise_and(finalmask,finalmask,mask = mthresh)
                finalmask += m

            _,mthresh = cv2.threshold(finalmask,0,255,cv2.THRESH_BINARY_INV)
            mask_strasse = cv2.bitwise_and(mask_strasse,mask_strasse, mask = mthresh)
            finalmask += mask_strasse  
                
            img = makesquare3(img)
            finalmask = makesquare2(finalmask)

            img_resized = cv2.resize(img, dim, interpolation = cv2.INTER_NEAREST)
            finalmask_resized = cv2.resize(finalmask, dim, interpolation = cv2.INTER_NEAREST)
            
            filepure,extension = splitext(filename)
            
            cv2.imwrite(os.path.join(destimagesdir, "{}o.png".format(filepure)), img_resized)
            cv2.imwrite(os.path.join(destmasksdir, "{}o.png".format(filepure)), finalmask_resized)

            assocf.write("{:05d}o:{}\n".format(count,filename))
            assocf.flush()
            count += 1

        else:
            continue
    f.close()

In createMask you find the usage of makesquare3 and makesquare2, since the video images are not square. However we prefer to use square image for neural network training.

def createMasksAugmented(ident, sourcejsonsdir, destimagesdir, destmasksdir):

    assocf = open(os.path.join(path,"assoc_{}_augmented.txt".format(ident)), "w")

    count = 0
    directory = sourcejsonsdir
    for filename in os.listdir(directory):
        if filename.endswith(".json"):
            print("{}:{}".format(count,os.path.join(directory, filename)))

            f = open(os.path.join(directory, filename))
            data = json.load(f)

            img_arr = data['imageData']  
            imgdata = base64.b64decode(img_arr)

            img = cv2.imdecode(np.frombuffer(imgdata, dtype=np.uint8), flags=cv2.IMREAD_COLOR)
            
            assert (img.shape[0] > dim[0])
            assert (img.shape[1] > dim[1])
            
            zoom = randint(75,90)/100.0
            angle = (2*random()-1)*3.0
            img_rotated = imutils.rotate_bound(img, angle)
            
            xf = int(img_rotated.shape[0]*zoom)
            yf = int(img_rotated.shape[1]*zoom)            
            
            img_zoomed = np.zeros((xf, yf, img_rotated.shape[2]), 'uint8')
            img_zoomed[:,:,:] = img_rotated[int((img_rotated.shape[0]-xf)/2):int((img_rotated.shape[0]-xf)/2)+xf,int((img_rotated.shape[1]-yf)/2):int((img_rotated.shape[1]-yf)/2)+yf,:] 
        

            finalmask = np.zeros((img_zoomed.shape[0], img_zoomed.shape[1]), 'uint8')
            mthresh = np.zeros((img_zoomed.shape[0], img_zoomed.shape[1]), 'uint8')
            masks=[]
            masks_strassen=[]
            mask_strasse = np.zeros((img_zoomed.shape[0], img_zoomed.shape[1]), 'uint8')

            for shape in data['shapes']:
                assert(shape['label'] in classes)

                vertices = np.array([[point[1],point[0]] for point in shape['points']])
                vertices = vertices.astype(int)

                rr, cc = polygon(vertices[:,0], vertices[:,1], img.shape)
                mask_orig = np.zeros((img.shape[0], img.shape[1]), 'uint8')
                mask_orig[rr,cc] = classes[shape['label']]
                
                mask_rotated = imutils.rotate_bound(mask_orig, angle)
                
                mask_zoomed = np.zeros((xf, yf), 'uint8')
                mask_zoomed[:,:] = mask_rotated[int((img_rotated.shape[0]-xf)/2):int((img_rotated.shape[0]-xf)/2)+xf,int((img_rotated.shape[1]-yf)/2):int((img_rotated.shape[1]-yf)/2)+yf] 
                
                if classes[shape['label']] != classes['Strasse']:
                    masks.append(mask_zoomed)
                else:
                    masks_strassen.append(mask_zoomed)

            for m in masks_strassen:
                mask_strasse += m
                    
            for m in masks:
                _,mthresh = cv2.threshold(m,0,255,cv2.THRESH_BINARY_INV)
                finalmask = cv2.bitwise_and(finalmask,finalmask,mask = mthresh)
                finalmask += m

            _,mthresh = cv2.threshold(finalmask,0,255,cv2.THRESH_BINARY_INV)
            mask_strasse = cv2.bitwise_and(mask_strasse,mask_strasse, mask = mthresh)
            finalmask += mask_strasse    
    
            # contrast-> alpha: 1.0 - 3.0; brightness -> beta: 0 - 100
            alpha = 0.8 + 0.4*random();
            beta = int(random()*15)
    
            img_adjusted = cv2.convertScaleAbs(img_zoomed, alpha=alpha, beta=beta)
        
        
            img_adjusted = makesquare3(img_adjusted)
            finalmask = makesquare2(finalmask)

            img_resized = cv2.resize(img_adjusted, dim, interpolation = cv2.INTER_NEAREST)
            finalmask_resized = cv2.resize(finalmask, dim, interpolation = cv2.INTER_NEAREST)
            
            filepure,extension = splitext(filename)
            
            if randint(0,1) == 0:
                cv2.imwrite(os.path.join(destimagesdir, "{}{}.png".format(filepure, ident)), img_resized)
                cv2.imwrite(os.path.join(destmasksdir, "{}{}.png".format(filepure, ident)), finalmask_resized) 
            else:
                cv2.imwrite(os.path.join(destimagesdir, "{}{}.png".format(filepure, ident)), cv2.flip(img_resized,1))
                cv2.imwrite(os.path.join(destmasksdir, "{}{}.png".format(filepure, ident)), cv2.flip(finalmask_resized,1))    
                
            assocf.write("{:05d}:{}\n".format(count, filename))
            assocf.flush()
            count += 1

        else:
            continue
    f.close()

The function createMasksAugmented above does basically the same thing as createMasks, but the image is randomly zoomed and rotated. Also the brightness and contrast is randomly adjusted. The purpose is to create more varieties of images from the original image for regularization.

The original images and mask images for neural network training can be generated by calling the functions below. In the directory fullpathjson you find the source json files. The parameters fullpathimages and fullpathmasks are the directory names for the destination.

Since we labeled 250 video images and stored them to json files, the functions below create altogether 1000 original and mask images. The function createMasksAugmented can be called even more often for more data.

createMasks(fullpathjson, fullpathimages, fullpathmasks)
createMasksAugmented("a4",fullpathjson, fullpathimages, fullpathmasks)
createMasksAugmented("a5",fullpathjson, fullpathimages, fullpathmasks)
createMasksAugmented("a6",fullpathjson, fullpathimages, fullpathmasks)

In Figure 3 below you find one result of the applied createMasks function. Left, there is an original image. In the middle there is the corresponding mask image. On the right there is the overlaid image. The region of the traffic sign is marked yellow, the street has color red, the street marking is in green and finally the car has a blue color.

Figure 3: Original Image, mask image and overlaid image

Training the model

We used different kinds of UNET models and compared the results from them. The UNET model from the previous post had mediocre results. For this reason we switched to an UNET model from a different author. It can be found here. The library keras_segmentation gave us much better predictions. However we do not have so much control over the model itself, for this was the reason we created our own UNET model, but we were inspired by keras_segmentation.

Above we already stated that we have five categories. Each category gets a number and a color assigned. The assignment can be seen in the python dictionary below.

colors = {0 : (0,0,0), 
          1 : (0,0,255), 
          2 : (0,255,0),
          3 : (255,0,0), 
          4 : (0,255,255),         
         }


dim = (256, 256) 

The UNET model we created is shown in the code below. It consists of a contracting and an expanding path. In the expanding path we have five convolution operations, followed by batch normalizations, relu-activations and maxpoolings. In between we also use dropouts for regularization. The results of these operations are saved into variables (c0, c1, c2, c3, c4). In the expanding path we implemented the upsampling operations and concatenate their outputs with the variables (c0, c1, c2, c3, c4) from the contracting path. Finally the code performs a softmax operation.

In general the concatenation of the outputs with the variables show benefits during training. Gradients of upper layers often vanish during the backpropagation operation. Concatenation is the method to prevent this behavior. However if this is really the case here has not been proofed.

def my_unet(classes):
    
    dropout = 0.4
    input_img = Input(shape=(dim[0], dim[1], 3))
    
    #contracting path
    x = (ZeroPadding2D((1, 1)))(input_img)
    x = (Conv2D(64, (3, 3), padding='valid'))(x)
    x = (BatchNormalization())(x)
    x = (Activation('relu'))(x)
    x = (MaxPooling2D((2, 2)))(x)
    c0 = Dropout(dropout)(x)
    
    x = (ZeroPadding2D((1, 1)))(c0)
    x = (Conv2D(128, (3, 3),padding='valid'))(x)
    x = (BatchNormalization())(x)
    x = (Activation('relu'))(x)
    x = (MaxPooling2D((2, 2)))(x)
    c1 = Dropout(dropout)(x)

    x = (ZeroPadding2D((1, 1)))(c1)
    x = (Conv2D(256, (3, 3), padding='valid'))(x)
    x = (BatchNormalization())(x)
    x = (Activation('relu'))(x)
    x = (MaxPooling2D((2, 2)))(x)
    c2 = Dropout(dropout)(x)
    
    x = (ZeroPadding2D((1, 1)))(c2)
    x = (Conv2D(256, (3, 3), padding='valid'))(x)
    x = (BatchNormalization())(x)
    x = (Activation('relu'))(x)
    x = (MaxPooling2D((2, 2)))(x)
    c3 = Dropout(dropout)(x)
    
    x = (ZeroPadding2D((1, 1)))(c3)
    x = (Conv2D(512, (3, 3), padding='valid'))(x)
    c4 = (BatchNormalization())(x)

    #expanding path
    x = (UpSampling2D((2, 2)))(c4)
    x = (concatenate([x, c2], axis=-1))
    x = Dropout(dropout)(x)
    x = (ZeroPadding2D((1, 1)))(x)
    x = (Conv2D(256, (3, 3), padding='valid', activation='relu'))(x)
    e4 = (BatchNormalization())(x)
    
    x = (UpSampling2D((2, 2)))(e4)
    x = (concatenate([x, c1], axis=-1))
    x = Dropout(dropout)(x)
    x = (ZeroPadding2D((1, 1)))(x)
    x = (Conv2D(256, (3, 3), padding='valid', activation='relu'))(x)
    e3 = (BatchNormalization())(x)
    
    x = (UpSampling2D((2, 2)))(e3)
    x = (concatenate([x, c0], axis=-1))
    x = Dropout(dropout)(x)
    x = (ZeroPadding2D((1, 1)))(x)
    x = (Conv2D(64, (3, 3), padding='valid', activation='relu'))(x)
    x = (BatchNormalization())(x)

    x = (UpSampling2D((2, 2)))(x)
    x = Conv2D(classes, (3, 3), padding='same')(x)
    
    x = (Activation('softmax'))(x)
    
    model = Model(input_img, x)
        
    return model

As described in the last post, we use for training a different kind of mask representation than the masks created by createMasks. In our case, the UNET model needs masks in the shape 256x256x5. The function createMasks creates masks in shape 256×256 though. For training it is preferable to have one layer for each category. This is why we implemented the function makecolormask. It is the same function as described in the last post, but it is optimized and performs better. See code below.

def makecolormask(mask):
    ret_mask = np.zeros((mask.shape[0], mask.shape[1], len(colors)), 'uint8')
    
    for col in range(len(colors)):
        ret_mask[:, :, col] = (mask == col).astype(int)
                       
    return ret_mask

Below we define callback functions for the training period. The callback function EarlyStopping stops the training after ten epoch if there is no improvement concerning validation loss. The callback function ReduceLROnPlateau reduces the learning rate if there is no improvement concerning validation loss after three epochs. And finally the callback function ModelCheckpoint creates a checkpoint from the UNET model’s weights, as soon as an improvement of the validation loss has been calculated.

modeljsonname="model-chkpt.json"
modelweightname="model-chkpt.h5"

callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ModelCheckpoint(os.path.join(path, dirmodels,modelweightname), verbose=1, save_best_only=True, save_weights_only=True)
]

The code below is used to load batches of original images and mask images into lists, which are fed into the training process. The function generatebatchdata has been described last post, so we omit the description since it is nearly identical.

def generatebatchdata(batchsize, fullpathimages, fullpathmasks):
  
    imagenames = os.listdir(fullpathimages)
    imagenames.sort()

    masknames = os.listdir(fullpathmasks)
    masknames.sort()

    assert(len(imagenames) == len(masknames))
    
    for i in range(len(imagenames)):
        assert(imagenames[i] == masknames[i])

    while True:
        batchstart = 0
        batchend = batchsize    
        
        while batchstart < len(imagenames):
            
            imagelist = []
            masklist = []
            
            limit = min(batchend, len(imagenames))

            for i in range(batchstart, limit):
                if imagenames[i].endswith(".png"):
                    imagelist.append(cv2.imread(os.path.join(fullpathimages,imagenames[i]),cv2.IMREAD_COLOR ))
                if masknames[i].endswith(".png"):
                    masklist.append(makecolormask(cv2.imread(os.path.join(fullpathmasks,masknames[i]),cv2.IMREAD_UNCHANGED )))


            train_data = np.array(imagelist, dtype=np.float32)
            train_mask= np.array(masklist, dtype=np.float32)

            train_data /= 255.0
    
            yield (train_data,train_mask)    

            batchstart += batchsize   
            batchend += batchsize

The variables generate_train and generate_valid are instantiated from generatebatchdata below. We use a batch size of two. We had trouble with a breaking the training process, as soon as the batch size was too large. We assume it is because of memory overflow in the graphic card. Setting it to a lower number worked fine. The method fit_generator starts the training process. The training took around ten minutes on a NVIDIA 2070 graphic card. The accuracy has reached 98% and the validation accuracy has reached 94%. This is pretty much overfitting, however we are aware, that we have very few images to train. Originally we created only 250 masks from the videos in the beginning. The rest of the training data resulted from data augmentation.

generator_train = generatebatchdata(2, fullpathimages, fullpathmasks)
generator_valid = generatebatchdata(2, fullpathimagesvalid, fullpathmasksvalid)
model.fit_generator(generator_train,steps_per_epoch=700, epochs=10, callbacks=callbacks, validation_data=generator_valid, validation_steps=100)

After training we saved the model. First the model structure is saved to a json file. Second the weights are saved by using the save_weights method.

model_json = model.to_json()
with open(os.path.join(path, dirmodels,modeljsonname), "w") as json_file:
    json_file.write(model_json)

model.save_weights(os.path.join(path, dirmodels,modelweightname))

The predict method of the model predicts masks from original images. It returns a list of masks, see code below. The parameter test_data is a list of original images.

predictions_test = model.predict(test_data, batch_size=1, verbose=1)

The masks in predictions_test from the model prediction have a 256x256x5 shape. Each layer represents a category. It is not convenient to display this mask, so predictedmask converts it back to a 256×256 representation. The code below shows predictedmask.

def predictedmask(masklist):
    y_list = []
    for mask in masklist:
        assert mask.shape == (dim[0], dim[1], len(colors))
        imgret = np.zeros((dim[0], dim[1]), np.uint8)
        
        imgret = mask.argmax(axis=2)
        
        y_list.append(imgret)
                    
    return y_list

Another utility function for display purpose is makemask, see code below. It converts 256×256 mask representation to a color mask representation by using the python dictionary colors. Again this is a function from last post, however optimized for performance.

def makemask(mask):
    ret_mask = np.zeros((mask.shape[0], mask.shape[1], 3), 'uint8')

    for col in range(len(colors)):
        layer = mask[:, :] == col
        ret_mask[:, :, 0] += ((layer)*(colors[col][0])).astype('uint8')
        ret_mask[:, :, 1] += ((layer)*(colors[col][1])).astype('uint8')
        ret_mask[:, :, 2] += ((layer)*(colors[col][2])).astype('uint8')
    
    return ret_mask

In Figure 4 you find one prediction from an original image. The original image is on the left side. The predicted mask in the middle, and the combined image on the right side.

Figure 4: Original image, predicted mask and overlaid image

You can see that the street was recognized pretty well, however parts of the sky was taken as a street as well. The traffic sign was labeled too, but not completely. As mentioned before we assume that we should have much more training data, to get better results.

Creating an augmented video

The code below creates an augmented video from an original video the students made in the beginning of the project. The name of the original video is video3.mp4. The name of the augmented video is videounet-3-drop.mp4. The method read of VideoCapture reads in each single images from video3.mp4 and stores them into the local variable frame. The image in frame is normalized and the mask image is predicted. The function predictedmask converts the mask into a 256×256 representation, and the function makemask creates a color image from it. Finally frame and the color mask are overlaid and saved to the new augmented video videounet-3-drop.mp4.

cap = cv2.VideoCapture(os.path.join(path,"videos",'video3.mp4'))
if (cap.isOpened() == True): 
    
    print("Opening video stream or file")
    out = cv2.VideoWriter(os.path.join(path,"videos",'videounet-3-drop.mp4'),cv2.VideoWriter_fourcc(*'MP4V'), 25, (256,256))
    while(cap.isOpened()):
        ret, frame = cap.read()
        
        if ret == False:
            break

        test_data = []
        test_data.append(frame)
        test_data = np.array(test_data, dtype=np.float32)
        test_data /= 255.0
        
        predicted = model.predict(test_data, batch_size=1, verbose=0)
        assert(len(predicted) == 1)
        pmask = predictedmask(predicted)
        
        if ret == True:
            
            mask = makemask(pmask[0])
  
            weighted = np.zeros((dim[0], dim[1], 3), 'uint8')
            cv2.addWeighted(frame, 0.6, mask, 0.4, 0, weighted)
 

            out.write(weighted)
            cv2.imshow('Frame',weighted)
            if cv2.waitKey(25) & 0xFF == ord('q'):
                break

    out.release()
            
cap.release()
cv2.destroyAllWindows()

Conclusion

In Figure 5 you see a snapshot of the augmented video. The street is marked pretty well. You can even see how the sidewalk is not marked as a street. On the right side you find a traffic sign in yellow. The prediction works here good too. On the left side you find blue marking. The color blue categorizes cars. However there are no cars on the original image and therefore this is a misinterpretation. The street markings in green are not shown very well. In some parts of the augmented video they appear very strong, and in some parts you see only fragments, just like in Figure 5.

Figure 5: Snapshot of the augmented video

Overall we are very happy with the result of this project, considering that we have only labeled 250 images. We are convinced that we get better results as soon as we have more images for training. To overcome the problem we tried with data augmentation. So from the 250 images we created around 3000 images. We also randomly flipped the images horizontally in a code version, which was not discussed here.

We think that we have too much overfitting, which can only be solved with more training data. Due to the time limit, we had to restrict ourselves with fewer training data.

Acknowledgement

Special thanks to the class of Summer Semester 2020 Design Cyber Physical Systems providing the videos while driving and labeling the images for the training data used for the neural network. We appreciate this very much since this is a lot of effort.

Also special thanks to the University of Applied Science Albstadt-Sigmaringen for providing the infrastructure and the appliances to enable this class and this research.

Image Processing for an Autonomous Guided Vehicle in a Learn-Factory

In our school we are offering a master class for students from different faculties such as machine engineering, textile engineering and computer science. The content of the class involve modern topics in industry with practical work; the class is called Research Project Industry 4.0. During the summer semester 2020 we had 13 students enlisted, while most of them are textile engineers and few are computer scientists

The textile engineering faculty owns a room with different machines used for textile processing. They call this room learn-factory, see Figure 1. The intention is to show students how to process textile goods. Another intention is to produce textiles in small numbers ordered from industry.

Figure 1: Learn-factory

In Figure 1 you see stations consisting of stitching machines, sewing machines, cutters, ovens etc. Processing textile goods nowadays still have a lot hand crafting involved, however to stay competitive, we need a certain amount of automation to lower labor cost. The idea is to use an autonomous guided vehicle to transport pre-processed textile goods from one station to another. This would relieve the workers from leaving their station, so they can concentrate on their textile processing tasks.

We decided that the autonomous guided vehicle will orientate through the learn-factory with a simple camera system. Images from the camera system need to be segmented into parts, such as floor and machine stations. The floor segments of the images are important, so the vehicle knows where to drive, and the machine stations locations are important to find the destination. Segmenting images is a well known problem in machine learning (semantic segmentation) and can be addressed by using neural networks.

Preparation

In order to use a neural network to solve the segmentation problem, we need training data. Fortunately this time we have many students helping to create data. The first step is to create images from a perspective of an autonomous guided vehicle. The students mounted a smart phone on a trolley and took images while moving around the trolley, see Figure 2.

Figure 2: Smart Phone on Trolley

In Figure 3 you see the layout of the learn-factory created by the students. You see the floor in blue, and the stations with their names as white rectangle. The learn-factory was divided into parts from where the images have to be taken. Each part was assigned to one student.

Figure 3: Layout of Learn-Factory

We set the goal, that each student has to take 1000 images from different angles. With 13 participating students, we gathered 13000 images, which we assumed to be a good coverage of the complete learn-factory.

Creating Training Data

Figure 3 shows the layout of the learn-factory with 16 machine and working stations. They are categorized as “1-Nagel1”, “Tape1” etc. You find the complete list of categories in the python dictionary classes below. Also the floor (“Boden”) and the window (“Fenster”) have their assigned categories. Altogether there are 20. Numbers are assigned to each machine and working stations. The number 0 is a category which belongs to neither of the categories in the dictionary classes.

classes = {"1-Nadel1" : 1, 
          "1-Nadel2" : 2,
          "Tape1" : 3, 
          "Tape2" : 4, 
          "Boden" : 5, 
          "Tisch0" : 6,         
          "Tisch1" : 7,
          "Tisch2" : 8,
          "Tisch3" : 9,
          "Tisch4" : 10,
          "Tisch5" : 11,
          "M-blau" : 12,
          "M-Blau" : 12,          
          "M-gelb" : 13,
          "M-Gelb" : 13,
          "M-rot" : 14,
           "M-Rot" : 14,
          "4-FÜW" : 15,
          "4-FUEW" : 15,
          "4-FÃœW" : 15,          
          "Fenster" : 16,
          "Ofen" : 17,   
          "Cutter" : 18,
          "Heizung" : 19,          
         }

As mentioned above every student created 1000 images from the learn-factory. The images need to be process further before training the neural network. We need to tell, which parts of the image belong to which category. See the categories in the code above.

The labelme website offers a downloadable tool to mask parts of the image and assign them to a label (or a category). Figure 4 shows a screenshot of the tool labelme. The student can create closed polygons by marking the vertices with a mouse click. Then the students assign a category to the polygon (Polygon Labels in Figure 4, right side).

Figure 4: Labelme

The tool labelme saves for each processed image a json file. The json file contains the list of polygons, described by their vertices, the names of the categories, and the image itself, which is saved as a base64 encoded image.

Figure 5: Creating Mask Images

The next step is to process the json files to extract the polygons and their names to create mask images, see Figure 5. On the left side of Figure 5 you see the tool labelme. In the middle you find a processed mask image, created by the polygons. On the right side there is an overlaid image from the original image and the mask image.

Data Processing

Each of the 13 students labeled 1000 images with the tool labelme. The tool labelme generates json files which can be parsed easily with python to extract the vertices of the labeled polygons. Each polygon is assigned to a category (this information can also be extracted from the json file) and each category is assigned a color. The python dictionary color in the code below shows the number/RGB-color assignment.

colors = {0 : (0,0,0), 
          1 : (0,0,55), 
          2 : (0,0,127),
          3 : (0,55,0), 
          4 : (0,127,0), 
          5 : (0,127,127), 
          6 : (40,0,0),         
          7 : (80,0,0),
          8 : (120,0,0),
          9 : (160,0,0),
          10 : (200,0,0),
          11 : (240,0,0),
          12 : (127,0,127),         
          13 : (127,127,0),
          14 : (0, 0, 255),
          15 : (0, 255 ,0),          
          16 : (0, 255 ,255),
          17 : (100, 0 ,0),   
          18 : (200,0,0),
          19 : (255,0,0),          
         }

dim = (128, 128) 

The function createMasks below is iterating through the list of json-files (13000 files!) located in sourcejsonsdir. In each iteration steps createMasks loads the content of the json file into data. It decodes the image tagged by imageData using the methods b64decode and imdecode and stores it into the variable img. The code verifies if the image is square and resizes it to 128×128.

Next, createMasks is iterating through all polygons in the json file, which are tagged as shapes. Again it iterates through all vertices of each shape and creates a polygon (rr and cc) from them. A mask image is created from the polygon and appended to a list of masks after it is resized to 128×128. The function createMasks sets the values of the pixels which are inside the polygon to the number which corresponds to the category:

mask_orig[rr,cc] = classes[shape[‘label’]]

Finally all masks are added to finalmask by using OpenCV’s threshold and bitwise_and operation. The function createMasks then stores the original image into destimagesdir and the mask image into destmasksdir.

def createMasks(sourcejsonsdir, destimagesdir, destmasksdir):

    assocf = open(os.path.join(path,"assoc_orig.txt"), "w")
    count = 0
    directory = sourcejsonsdir
    for filename in os.listdir(directory):
        if filename.endswith(".json"):
            print("{}:{}".format(count,os.path.join(directory, filename)))
            
            f = open(os.path.join(directory, filename))
            data = json.load(f)
            img_arr = data['imageData']  
            imgdata = base64.b64decode(img_arr)

            img = cv2.imdecode(np.frombuffer(imgdata, dtype=np.uint8), flags=cv2.IMREAD_COLOR)
   
            assert (img.shape[0] > dim[0])
            assert (img.shape[1] > dim[0])
           
            if img.shape[0] != img.shape[1]:
                print("shape is wrong: {},{} ... skipping".format(img.shape[0],img.shape[1]))
                continue

            img_resized = cv2.resize(img, dim, interpolation = cv2.INTER_NEAREST)

            finalmask = np.zeros((img_resized.shape[0], img_resized.shape[1]), 'uint8')
            mthresh = np.zeros((img_resized.shape[0], img_resized.shape[1]), 'uint8')
            masks=[]

            for shape in data['shapes']:
                assert(shape['label'] in classes)

                vertices = np.array([[point[1],point[0]] for point in shape['points']])
                vertices = vertices.astype(int)

                rr, cc = polygon(vertices[:,0], vertices[:,1], img.shape)
                mask_orig = np.zeros((img.shape[0], img.shape[1]), 'uint8')
                mask_orig[rr,cc] = classes[shape['label']]
                masks.append(cv2.resize(mask_orig, dim, interpolation = cv2.INTER_NEAREST))

            for m in masks:
                _,mthresh = cv2.threshold(m,1,255,cv2.THRESH_BINARY_INV)
                finalmask = cv2.bitwise_and(finalmask,finalmask,mask = mthresh)
                finalmask += m

            cv2.imwrite(os.path.join(destimagesdir, "{:05d}o.png".format(count)), img_resized)
            cv2.imwrite(os.path.join(destmasksdir, "{:05d}o.png".format(count)), finalmask)

            assocf.write("{:05d}o:{}\n".format(count, data['imagePath']))
            assocf.flush()
            count += 1

        else:
            continue
    f.close()

13000 mask images can now be used for training, just by processing the available data. However, we decided to generate more training data by using data augmentation. The function createMasksAugmented below works in a similar way as createMasks, therefore we just explain a few differences. The function createMasksAugmented rotates the original image with a random angle between -3 and 3 degrees and zooms into the rotated images with a random percentage (between 80 and 90 percent of the original image). The method imutils.rotate_bound is performing this operation. The final image is stored into image_zoomed. The mask image is generated the same way as in function createMasks. However createMasksAugmented rotates and zooms the mask image in the same way as the original image. More data augmentation is applied by OpenCV’s convertScaleAbs method. It modifies randomly contrast and brightness on the rotated and zoomed image before saving it into the destimagedir directory. The rotated and zoomed mask image is saved into the directory destmaskdir.

def createMasksAugmented(sourcejsonsdir, destimagesdir, destmasksdir):

    assocf = open(os.path.join(path,"assoc_augmented.txt"), "w")

    count = 0
    directory = sourcejsonsdir
    for filename in os.listdir(directory):
        if filename.endswith(".json"):
            print("{}:{}".format(count,os.path.join(directory, filename)))

            f = open(os.path.join(directory, filename))
            data = json.load(f)
            img_arr = data['imageData']  
            imgdata = base64.b64decode(img_arr)

            img = cv2.imdecode(np.frombuffer(imgdata, dtype=np.uint8), flags=cv2.IMREAD_COLOR)
            
            assert (img.shape[0] > dim[0])
            assert (img.shape[1] > dim[0])
            
            if img.shape[0] != img.shape[1]:
                print("shape is wrong: {},{} ... skipping".format(img.shape[0],img.shape[1]))
                continue
            
            zoom = randint(80,90)/100.0
            angle = (2*random()-1)*3.0
            img_rotated = imutils.rotate_bound(img, angle)
            
            xf = int(img_rotated.shape[0]*zoom)
            yf = int(img_rotated.shape[1]*zoom)            
            
            img_zoomed = np.zeros((xf, yf, img_rotated.shape[2]), 'uint8')
            img_zoomed[:,:,:] = img_rotated[int((img_rotated.shape[0]-xf)/2):int((img_rotated.shape[0]-xf)/2)+xf,int((img_rotated.shape[1]-yf)/2):int((img_rotated.shape[1]-yf)/2)+yf,:] 
            

            img_resized = cv2.resize(img_zoomed, dim, interpolation = cv2.INTER_NEAREST)

            finalmask = np.zeros((img_resized.shape[0], img_resized.shape[1]), 'uint8')
            mthresh = np.zeros((img_resized.shape[0], img_resized.shape[1]), 'uint8')
            masks=[]

            for shape in data['shapes']:
                assert(shape['label'] in classes)

                vertices = np.array([[point[1],point[0]] for point in shape['points']])
                vertices = vertices.astype(int)

                rr, cc = polygon(vertices[:,0], vertices[:,1], img.shape)
                mask_orig = np.zeros((img.shape[0], img.shape[1]), 'uint8')
                mask_orig[rr,cc] = classes[shape['label']]
                
                mask_rotated = imutils.rotate_bound(mask_orig, angle)
                
                mask_zoomed = np.zeros((xf, yf), 'uint8')
                mask_zoomed[:,:] = mask_rotated[int((img_rotated.shape[0]-xf)/2):int((img_rotated.shape[0]-xf)/2)+xf,int((img_rotated.shape[1]-yf)/2):int((img_rotated.shape[1]-yf)/2)+yf] 
                
                masks.append(cv2.resize(mask_zoomed, dim, interpolation = cv2.INTER_NEAREST))

            for m in masks:
                _,mthresh = cv2.threshold(m,1,255,cv2.THRESH_BINARY_INV)
                finalmask = cv2.bitwise_and(finalmask,finalmask,mask = mthresh)
                finalmask += m

            alpha = 0.8 + 0.4*random();
            beta = int(random()*15)
    
            img_adjusted = cv2.convertScaleAbs(img_resized, alpha=alpha, beta=beta)
        
            cv2.imwrite(os.path.join(destimagesdir, "{:05d}a.png".format(count)), img_adjusted)
            cv2.imwrite(os.path.join(destmasksdir, "{:05d}a.png".format(count)), finalmask) 

            assocf.write("{:05d}a:{}\n".format(count, data['imagePath']))
            assocf.flush()
            count += 1

        else:
            continue
    f.close()

The function createMasksAugmented was applied one time to all available data. So at the end we had 26000 images for training. Since createMasksAugmented uses random numbers for rotation, zooming, brightness and contrast, it could be applied even more than one time.

The UNET Model

We train our data on a UNET model, because we had good semantic segmentation results with this kind of model in previous projects. Hence, the neural network was already described in previous posts. Originally we took the code from here, however modified it slightly to match our purpose. The first function is conv2d_block which executes two convolutions and relu-activation operations in a row, see code below.

def conv2d_block(input_tensor, n_filters, kernel_size=3, batchnorm=True):
    # first layer
    x = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), kernel_initializer="he_normal",
               padding="same")(input_tensor)
    if batchnorm:
        x = BatchNormalization()(x)
    x = Activation("relu")(x)
    # second layer
    x = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), kernel_initializer="he_normal",
               padding="same")(x)
    if batchnorm:
        x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

The code get_unet below builds up the model by calling conv2_block and maxpooling2D several times (this is called the contracting path). The output of maxpooling2D operations are stored in variables (c1, c2, c3, c4) which are fed later into the expansive path. At the expansive path, get_unet calls reverse convolution operations and concatenates the saved variables (c1, c2, c3, c4) from the contracting path with the output of the Conv2DTranspose operations. In the line:

c10 = Conv2D(len(colors), (1, 1), activation=”softmax”) (c9)

you find the minor change we made. The output size of the model will be an image of size 128×128 with 20 (which is len(color)) layers.

def get_unet(input_img, n_filters=16, dropout=0.5, batchnorm=True):
    # contracting path
    c1 = conv2d_block(input_img, n_filters=n_filters*1, kernel_size=3, batchnorm=batchnorm)
    p1 = MaxPooling2D((2, 2)) (c1)
    p1 = Dropout(dropout*0.5)(p1)

    c2 = conv2d_block(p1, n_filters=n_filters*2, kernel_size=3, batchnorm=batchnorm)
    p2 = MaxPooling2D((2, 2)) (c2)
    p2 = Dropout(dropout)(p2)

    c3 = conv2d_block(p2, n_filters=n_filters*4, kernel_size=3, batchnorm=batchnorm)
    p3 = MaxPooling2D((2, 2)) (c3)
    p3 = Dropout(dropout)(p3)

    c4 = conv2d_block(p3, n_filters=n_filters*8, kernel_size=3, batchnorm=batchnorm)
    p4 = MaxPooling2D(pool_size=(2, 2)) (c4)
    p4 = Dropout(dropout)(p4)
    
    c5 = conv2d_block(p4, n_filters=n_filters*16, kernel_size=3, batchnorm=batchnorm)
    
    # expansive path
    u6 = Conv2DTranspose(n_filters*8, (3, 3), strides=(2, 2), padding='same') (c5)
    u6 = concatenate([u6, c4])
    u6 = Dropout(dropout)(u6)
    c6 = conv2d_block(u6, n_filters=n_filters*8, kernel_size=3, batchnorm=batchnorm)

    u7 = Conv2DTranspose(n_filters*4, (3, 3), strides=(2, 2), padding='same') (c6)
    u7 = concatenate([u7, c3])
    u7 = Dropout(dropout)(u7)
    c7 = conv2d_block(u7, n_filters=n_filters*4, kernel_size=3, batchnorm=batchnorm)

    u8 = Conv2DTranspose(n_filters*2, (3, 3), strides=(2, 2), padding='same') (c7)
    u8 = concatenate([u8, c2])
    u8 = Dropout(dropout)(u8)
    c8 = conv2d_block(u8, n_filters=n_filters*2, kernel_size=3, batchnorm=batchnorm)

    u9 = Conv2DTranspose(n_filters*1, (3, 3), strides=(2, 2), padding='same') (c8)
    u9 = concatenate([u9, c1], axis=3)
    u9 = Dropout(dropout)(u9)
    c9 = conv2d_block(u9, n_filters=n_filters*1, kernel_size=3, batchnorm=batchnorm)
    
    c10 = Conv2D(len(colors), (1, 1), activation="softmax") (c9)
   
    model = Model(inputs=[input_img], outputs=[c10])
    return model

We mentioned above that we created mask images from the json files, which have a size of 128×128. Each category is simply assigned a number between 0 and 19 at each pixel position. However the neural network has an output of 128x128x20. We use for the neural network a different mask representation, because this way it is preferred for training. Hence, each layer of the 128x128x20 image gets its own category assigned. If a pixel of an image layer is one, it marks the position of an object with its category; if the pixel is zero, it marks that no object of the former category is at this position. Since this is a preferred representation for neural networks, we need to convert the 128×128 masks (created from json files) to 128x128x20 masks. The function makecolormask below is performing this operation. Note that there are more efficient ways to convert the masks, but this will be discussed in a later post.

def makecolormask(mask):
    ret_mask = np.zeros((mask.shape[0], mask.shape[1], len(colors)), 'uint8')
    for i in range(mask.shape[0]):
        for j in range(mask.shape[1]):
                assert(mask[i,j] < len(colors))
                ret_mask[i,j,mask[i,j]] = 1
    return ret_mask

In this project we ran into a problem, which we have not seen in previous projects yet. As mentioned above, we generated 26000 original images and 26000 mask images. Loading all images into memory and with a 8GB NVIDIA graphic card will break the training. Therefore we created the function generatebatchdata to load original and mask images in smaller batches into memory, see code below. The parameters of the functions are fullpathimages and fullpatchmasks which point to the directories of the training data. The lists imagenames and masknames contain the filenames of the original and mask images.

The function generatebatchdata reads the data in (cv2.imread) and appends the data to imagelist and masklist. Here the makecolormask function is applied to convert the json mask image to a 128x128x20 representation. The original images are processed even further with a mean and a standard deviation operations for normalization. Finally the imagelist and masklist is returned with the yield function. Obviously generatebatchdata is executed concurrently. The yield function stops the execution, until the training process continues to execute generatebatchdata with the next batch.

def generatebatchdata(batchsize, fullpathimages, fullpathmasks, imagenames, masknames):
  
    assert(len(imagenames) == len(masknames))

    while True:
        batchstart = 0
        batchend = batchsize    
        
        while batchstart < len(imagenames):
            
            imagelist = []
            masklist = []
            
            limit = min(batchend, len(imagenames))

            for i in range(batchstart, limit):
                if imagenames[i].endswith(".png"):
                    imagelist.append(cv2.imread(os.path.join(fullpathimages,imagenames[i]),cv2.IMREAD_COLOR ))
                if masknames[i].endswith(".png"):
                    masklist.append(makecolormask(cv2.imread(os.path.join(fullpathmasks,masknames[i]),cv2.IMREAD_UNCHANGED )))


            train_data = np.array(imagelist, dtype=np.float32)
            train_mask= np.array(masklist, dtype=np.float32)

            train_data -= train_data.mean()
            train_data /= train_data.std()
    
            yield (train_data,train_mask)    

            batchstart += batchsize   
            batchend += batchsize

In the code below you see that the function generatebatchdata is instantiated twice. One time for training data, and one time for validation data. Previously we selected randomly about 15% of the original and mask images and moved them to validation directories (fullpathimagesvalid and fullpathmasksvalid).

generator_train = generatebatchdata(30, fullpathimages, fullpathmasks, imagenames, masknames)
generator_valid = generatebatchdata(30, fullpathimagesvalid, fullpathmasksvalid,  imagenamesvalid, masknamesvalid)

The UNET model is instantiated and compiled in the code below. As input the model expects images with dimension dim with three layers (RGB) which is 128x128x3.

input_img = Input((dim[0], dim[1], 3), name='img')
model = get_unet(input_img, n_filters=len(colors), dropout=0.05, batchnorm=True)
model.compile(optimizer=Adam(), loss="categorical_crossentropy", metrics=["accuracy"])

During training we use three callback function. The first callback is EarlyStopping, which stops the training process if there is no improvement of the loss value after ten epochs. The next callback ReduceLROnPlateau modifies the learning rate if there is no imrovement concerning loss after three epochs. The last callback ModelCheckpoint saves a checkpoint of the model during the training process if the model has improved its loss value.

callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ModelCheckpoint(os.path.join(path, dirmodels,"model-chkpt.h5"), verbose=1, save_best_only=True, save_weights_only=True)
]

Below the code starts the training process. The training was executed on a NVIDIA 2070 graphic card with 8GB memory. It turned out that we have a accuracy of 95,3% and a validation accuracy of 94,9%. So we have a small overfitting. Training time was about 20 minutes.

model.fit_generator(generator_train,steps_per_epoch=530, epochs=20, callbacks=callbacks, validation_data=generator_valid, validation_steps=3064)

The function predictedmask below is a utility function to create mask images from the model’s output back to the original mask image form we received after processing the json files. It is a reverse function of makecolormask. The function predictedmask is useful to create masks as seen as in Figure 6 (in the middle) for display purpose. Since 128x128x20 images cannot be displayed in such a way.

def predictedmask(masklist):
    y_list = []
    for mask in masklist:
        assert mask.shape == (dim[0], dim[1], 20)
        imgret = np.zeros((dim[0], dim[1]), np.uint8)
        for i in range(dim[0]):
            for j in range(dim[1]):
                result = np.where(mask[i,j,:] == np.amax(mask[i,j,:]))
                assert result[0][0] < len(colors)
                imgret[i,j] = result[0][0]
        y_list.append(imgret)
    return y_list

Figure 6 shows a verification example. An original image (left) was put into the trained model with the model’s predict method (model.predict) and the result can be seen in Figure 6 in the middle. The function predictedmask was applied here. Overlaying the left and the middle image results to the right image.

Figure 6: Verifying the model

Creating an augmented video

The goal of this project is to create an augmented life video from the learn-factory while walking through with a video camera. The application should mask the floor, and the machines should be labeled with text, which are the categories. Basically each image of the video is fed into the trained neural network and it predicts a 128x128x20 mask image. However, further processing is needed, such as labeling the video images with text.

The function predictedmask above converts 128x128x20 mask images back to 128×128 mask images. The pixel values are values between 0 and 19. The python dictionary categories below shows an assignment between the values and the categories written as text. A similar assignment (python dictionary classes) we have above, but here it is just the way around.

categories = {1: "1-Nadel1", 
              2: "1-Nadel2",
              3: "Tape1", 
              4: "Tape2", 
              5: "Boden", 
              6: "Tisch0",         
              7: "Tisch1",
              8: "Tisch2",
              9: "Tisch3",
              10: "Tisch4",
              11: "Tisch5",
              12: "M-Blau",          
              13: "M-Gelb",
              14: "M-Rot",
              15: "4-FUW",          
              17: "Ofen",   
              18: "Cutter",
              19: "Heizung",          
         }

As already mentioned, the masks created by predictedmask are one layer images, whose pixels values are between 0 and 19. In order to convert the masks to a color image the function makemaskcolor is used. It uses the python dictionary colors to assign each pixel value to a color value. The output of makemaskcolor is a colored mask image of one category with shape 128x128x3. The parameter color indicates from which category a mask should be created. Such as, if you want the mask of a floor (Boden, see python dictionary categories), the value of color should be five. In a later post, we can show how this operation can be done more efficient.

def makemaskcolor(mask, color):
    ret_mask = np.zeros((mask.shape[0], mask.shape[1], 3), 'uint8')
    for i in range(mask.shape[0]):
        for j in range(mask.shape[1]):
                if mask[i,j] == color:
                    ret_mask[i,j,0] = colors[color][0]
                    ret_mask[i,j,1] = colors[color][1]
                    ret_mask[i,j,2] = colors[color][2]
    return ret_mask

Below the code of the function markpicture. The input parameter inp_img is a mask image from the predictedmask ouput. The parameter orig_img is the original image, which has been fed into the neural network model for the mask prediction. We decided to show only the mask of the floor. The function markpicture creates with the following code the floor mask (boden stands for floor):

boden = makemaskcolor(inp_img,5)

Then the function markpicture iterates through the category values, however it omits 5 and 16. The category 16 is a window (Fenster stands for window) and needs no label. OpenCV’s inRange Method extracts the i’th mask (see for loop) and assigns it to inp_img_new. The code calls OpenCV’s findContour to get the contour of the i’th category. The contour needs to exceed a minimum area, which can be determined by cv2.contourArea. The code then calculates the center of gravity of the contour and puts text indicating the category at this position onto the original image. Text is put two times there; first white text with a larger boldness and second text in color of the category (see dictionary color) and a smaller boldness. By doing this, we improve to emphasize the text on the image. Finally markpicture overlays the processed image with the floor (cv2.addWeighted) and returns the image.

def markpicture(inp_img, orig_img):

    update_img = orig_img.copy()

    img_new = np.zeros((inp_img.shape[0], inp_img.shape[1],3), 'uint8')
    inp_img_new = np.zeros((inp_img.shape[0], inp_img.shape[1],3), 'uint8')
    boden = np.zeros((inp_img.shape[0], inp_img.shape[1],3), 'uint8')
    boden_resized = np.zeros((orig_img.shape[0], orig_img.shape[1],3), 'uint8')
    weighted = np.zeros((orig_img.shape[0], orig_img.shape[1],3), 'uint8')

    boden = makemaskcolor(inp_img,5)

    for i in [1,2,3,4,6,7,8,9,10,11,12,13,14,15,17,18,19]:

        inp_img_new = inp_img.copy()
        inp_img_new = inp_img_new.astype(np.uint8)

        inp_img_new = cv2.inRange(inp_img_new,i,i)
        contours,_ = cv2.findContours(inp_img_new,cv2.RETR_TREE,cv2.CHAIN_APPROX_NONE)

        if len(contours) > 0:

            contours = sorted(contours, key=cv2.contourArea,  reverse=True)

            M = cv2.moments(contours[0])
            area = cv2.contourArea(contours[0])
            if M['m00'] > 0 and area > 600:
                
                cx = int(int(M['m10']/M['m00'])*orig_img.shape[0]/inp_img.shape[0])
                cy = int(int(M['m01']/M['m00'])*orig_img.shape[1]/inp_img.shape[1])
                update_img= cv2.putText(update_img,categories[i],(cx,cy), cv2.FONT_HERSHEY_SIMPLEX,0.5 if area > 800 else 0.25, (255,255,255), 4) 
                update_img= cv2.putText(update_img,categories[i],(cx,cy), cv2.FONT_HERSHEY_SIMPLEX,0.5 if area > 800 else 0.25, colors[i], 2 if area > 500 else 1) 
            
    boden_resized = cv2.resize(boden, (orig_img.shape[0], orig_img.shape[1]), interpolation = cv2.INTER_AREA) 

    assert(boden_resized.shape == update_img.shape)
    cv2.addWeighted(update_img, 0.6, boden_resized, 0.4, 0, weighted)            
            
    return weighted

In Figure 7 you can find an output of maskpicture. On the left side a representation of the mask from the neural network prediction and on the right side a processed original image with text and overlaid floor. You find here the text of the categories Tape1 and Tape2.

Figure 7: Mask and labeled original image

To create an augmented video we used the code below. Before augmentation we walked through the learn-factory and created a normal video from a robot’s perspective and named it video.mp4. OpenCV’s VideoCapture opens the video and iterates through it image by image. VideoCaputure’s read method extracts each image from the video and assigns it to frame. frame is resized and normalized. Then the code feeds it into the neural network’s predict method. The output is a predicted mask (with shape 128x128x20), which is converted to another mask representation with the function predictedmask (shape 128×128). Its output is fed into markpicture, which is putting text on the image and marking the floor. Finally the image is written to a new video videounet128-1.mp4.

cap = cv2.VideoCapture(os.path.join(path,"videos",'video.mp4'))
if (cap.isOpened() == True): 
    
    print("Opening video stream or file")
    out = cv2.VideoWriter(os.path.join(path,"videos",'videounet128-1.mp4'),cv2.VideoWriter_fourcc(*'MP4V'), 25, (256,256))
    while(cap.isOpened()):
        ret, frame = cap.read()
        
        if ret == False:
            break
        
        frame_resized = np.zeros((dim[0], dim[1], 3), 'uint8')  
        frame_resized = cv2.resize(frame, (dim[0],dim[1]), interpolation = cv2.INTER_AREA) 
        test_data = []
        test_data.append(frame_resized)
        test_data = np.array(test_data, dtype=np.float32)
        test_data -= test_data.mean()
        test_data /= test_data.std()
        
        predicted = model.predict(test_data, batch_size=1, verbose=0)
        assert(len(predicted) == 1)
        pmask = predictedmask(predicted[0])
        
        if ret == True:
            img = markpicture(pmask, frame)
            out.write(img)
            cv2.imshow('Frame',img)
            if cv2.waitKey(25) & 0xFF == ord('q'):
                break

    out.release()
            
cap.release()
cv2.destroyAllWindows()

Conclusion

The resulting videos are astonishing. You can walk through the learn-factory with a video camera and process in real-time the video and you receive an output such as displayed in Figure 8. The machines are mostly labeled correctly and the floor indicates where a autonomous guided vehicle could drive.

Figure 8: Snapshot of an augmented video

There are situations where the labeling works not correctly, especially if you walk into areas, where the students have not taken any pictures and therefore no labeling was done. In Figure 3 it is the area on the right side. Figure 9 shows such a snapshot where the labeling is completely incorrect. However the floor is still predicted correctly. Note that no training data existed for this area, so the chaotic prediction is expected.

Figure 9: Snapshot of video with chaotic predictions

Acknowledgement

Special thanks to the class of Summer Semester 2020 Forschungsprojekt Industrie 4.0 providing the images of the learn-factory (13000!) and labeling the images for the training data (also 13000!!) used for the neural network. We appreciate this very much, and we know how much effort you have put into this.

Also special thanks to the University of Applied Science Albstadt-Sigmaringen for hosting the learn-factory and providing the appliances to enable this research.

License Plate Recognition using Neural Network

At the school I work I am instructing a class called Design Cyber Physical Systems. The name of the class leaves many interpretations open about its content. However I leave this open intentionally. In the previous semesters, I let the students choose a topic, which has to do something with sensors, actors and micro-controllers. The students have to brainstorm a specific implementation idea, then they have to create a plan and execute the plan during the semester. This semester I changed the topic: This time the implementation idea has to contain a neural network and a camera system.

Three students who participated in this class decided to create a system which takes images from an access road having a gate towards a parking lot. As soon as a car approaches, the camera takes an image of the car with its license plate. The system has to determine the position of the license plate on the image and extract the license plate. Algorithms figure out the characters and numbers and compare them with the license plates stored in a database. If the license plate is identical to one of the license plates in the database, the gate is opened. Due to organizational problems with accessing a parking lot gate, we decided to use a simple signal light instead.

Data Preprocessing

The systems’s application needs first to find the license plate on the car’s image. This can be done in various ways, but we chose to use a neural network. We need to have many training images, which are images of cars from the front. We need to mark the license plates on each image to receive a new mask image needed for the neural network training.

There are already programs available, to download images, e.g. chromedriver. The program we used can be found here. You can control the search criteria with the options, and the program downloads automatically numerous images. The search criteria in our case was simply “license plate car”. Not every downloaded image served our purpose. We limited ourselves to German license plates, so we had to filter manually the useful images. Altogether we gathered around 760 training images and test images.

The next step was to label the areas of license plates of the downloaded images. We found a tool called labelme, which we had to install. Note that we managed to install only an older version of labelme on Ubuntu 18.04 (command: sudo pip3 install labelme==3.3.3). In Figure 1 you can see the tool labelme with an uploaded image. As a user you can click with the mouse around the license plate so a polygon is created. The points of the polygon can be saved into a json file. We have done this for all 760 images and 760 json files were created.

Figure 1: Labelme

The next step was to create mask images from the json files. Each json file contained a number of points which was parsed by the function create_mask_from_image below. It opens the json file, retrieves the points, and uses skimage.draw‘s polygon method to create the mask image. Finally it stores the image to the mask_path directory.

from skimage.draw import polygon

def create_mask_from_image(path, json_file, img_width, img_height, file_number, output_dir):
    json_path = os.path.join(path, json_file)

    mask_name = "img_" + str(file_number) + ".png"
    mask_path = os.path.join(output_dir, "masks", mask_name)
    
    f = open(json_path)
    data = json.load(f)

    vertices = np.array([[point[1],point[0]] for point in data['shapes'][0]['points']])
    vertices = vertices.astype(int)

    img = np.zeros((img_height, img_width), 'uint8')

    rr, cc = polygon(vertices[:,0], vertices[:,1], img.shape)
    img[rr,cc] = 1

    imsave(mask_path, img)

This function was repeated for all available json files. So at this moment we had all training images and mask images stored in the paths dataset/images/ and dataset/masks/. Note that the names of the training images and the corresponding mask images need to be identical.

Training of the Neural Network

What we are facing is a segmentation problem. After we feed the application with an image of a car, we want to receive an image indicating where we find the license plate. A U-Net can be used to solve such a problem. In this blog this was already discussed on several posts, so see earlier posts. This time however we used the library keras_segmentation. The model is returned by the segnet method. We have chosen to use images with a height of 350 and a width of 525.

from keras_segmentation.models.segnet import segnet

model = segnet(n_classes=2, input_height=350, input_width=525)

path="/home/.../dataset/"

model.train(
    train_images =  path+"images/",
    train_annotations = path+"masks/",
    checkpoints_path = "/tmp/segnet", epochs=3
)

model.save("weights.h5")

The train method executes the training. On a NVIDIA 2070 graphic card it took about three minutes with three epochs with an accuracy of 99,4%. After training we saved the weights to a file called weights.h5.

Testing of the Neural Network

We have put a few images aside to test the trained model. The code below loads the model by using the method load_weights. A test image is read and shown with matplotlib’s imshow.

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

model.load_weights("weights.h5")
img=mpimg.imread("/home/.../img_1.jpg")
imgplot = plt.imshow(img)
plt.show()

Figure 2 shows the test image from matplotlib. The license plate can be clearly seen at the lower/middle part of the image.

Figure 2: Test Image

To predict the license plate area on the image, we need to feed the test image into the trained model. This can be done with the predict_segmentation method. This method writes the predicted image to out.png.

test_img = "/home/.../img_1.jpg"

out = model.predict_segmentation(
    inp=test_img,
    out_fname="dataset/tests/out.png",
)

plt.imshow(out)

The code above calls the matplotlib method imshow and in Figure 3 you can see the predicted mask image derived from the test image.

Figure 3: Predicted Mask

At the moment you cannot see how Figure 2 and Figure 3 overlap, so we wrote code to create an added image from the test image and the predicted mask image, see code below. Note that each pixel of the predicted mask image only has two values, zeros and ones. In order to add Figure 2 and Figure 3, we need to multiply the predicted mask image by 255. The OpenCV method addWeighted adds both images to a new image.

orig_img = cv2.imread(test_img)
out = out.astype('float32')
out = cv2.resize(out, (orig_img.shape[1], orig_img.shape[0]))
new_out = np.zeros((orig_img.shape[0], orig_img.shape[1], 3), dtype="uint8")
new_out[:,:,0] = out[:,:] * 255
orig_img = cv2.cvtColor(orig_img, cv2.COLOR_BGR2RGB)
plt.imshow(cv2.addWeighted(orig_img, 0.5, new_out, 0.5, 0.0))

Matplotlib’s method imshow shows the added image, see Figure 4. You can see that both images align to each other very well. The license plate is highlighted with red color.

Figure 4: Added Images

Position Detection of the License Plate

The next step is to find the position of the mask to receive a bounding box around the mask. We can use the OpenCV method findContours to receive the contour of the mask. The code below shows how we call findContours.

contours,_ = cv2.findContours(np.array(out, "uint8"), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
plt.imshow(cv2.drawContours(orig_img, [contours[0]], -1, (255,0,0), 2))

Figure 5 shows the output image created by the OpenCV method drawContours.

Figure 5: Mask’s Contour

The code below creates a bounding box from the contour around the license plate, which is assumed to be the first element of the output list contours. The OpenCV method boxPoints finds the rectangle with the corner points rect_corners having the minimum area around the contour. OpenCV’s drawContours draws the bounding box with matplotlib.

rect = cv2.minAreaRect(contours[0])
rect_corners = cv2.boxPoints(rect)
rect_corners = np.int0(rect_corners)

orig_img = mpimg.imread(test_img)
contour_img = cv2.drawContours(orig_img, [rect_corners], 0, (0,255,0),  2)
plt.imshow(contour_img)

In Figure 6 you can see how matplotlib draws the bounding box around the license plate. It is not generally true that the edges of the bounding box are in parallel to the edges of the test images. It is very possible, that the bounding rectangle is warped. This is something you cannot see in Figure 6.

Figure 6: Box bounding the License Plate

Warping the License Plate Image

Before recognizing the letters of the license plate image, we should transform the bounding box to a real rectangular shape. The function order_points_clockwise of the code below sorts the points of rect_corners clockwise with the first point on the upper left corner. It returns the rearranged list to rect_corners_clockwise. The function warp_img extracts the license plate piece from the original test image and transform it to a real rectangle using the transformation methods getPerspectiveTransform and warpPerspective. The method warpPerspective receives the width and height of the extracted license plate with the function get_polygon_dimensions. Note again that the extracted license plate is not a real rectangle, but rather rhombus. The function get_polygon_dimensions uses Pythagoras for approximating the width and height of the rhombus. OpenCV’s method getPerspectiveTransform calcluates the transformation matrix and OpenCV’s method warpPerspective transforms the license plate image so it as a real rectangle shape.

def get_polygon_dimensions(points):
    from math import sqrt
    (tl, tr, br, bl) = points
    widthA = sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2))
    widthB = sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2))
    heightA = sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2))
    heightB = sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2))

    width = max(int(widthA), int(widthB))
    height = max(int(heightA), int(heightB))

    return (width, height)

def warp_img(img, points):
    width, height = get_polygon_dimensions(points)
    dst = np.array([
        [0, 0],
        [width - 1, 0],
        [width - 1, height - 1],
        [0, height - 1]], dtype = "float32")
    
    M = cv2.getPerspectiveTransform(points, dst)
    warped_img = cv2.warpPerspective(img, M, (width, height))

    return warped_img

def order_points_clockwise(pts):
    rect = np.zeros((4, 2), dtype="float32")
  
    s = pts.sum(axis=1)
    rect[0] = pts[np.argmin(s)]
    rect[2] = pts[np.argmax(s)]
    
    diff = np.diff(pts, axis=1)
    rect[1] = pts[np.argmin(diff)]
    rect[3] = pts[np.argmax(diff)]
 
    return rect

rect_corners_clockwise = order_points_clockwise(rect_corners)
orig_img = mpimg.imread(test_img)
warped_img = warp_img(orig_img, np.array(rect_corners_clockwise, "float32"))
plt.imshow(warped_img)

gray_img = cv2.cvtColor(warped_img, cv2.COLOR_RGB2GRAY)
_,prediction_img = cv2.threshold(gray_img, 50, 255, cv2.THRESH_BINARY)
plt.imshow(prediction_img)

The upper part of Figure 7 you can see the image of the license plate piece from the original test image. The lower part of Figure 7 you can see the warped license plate image. It is converted to grayscale image combined with a threshold filter. Note that the warping in Figure 7 does not show much a difference. However the rectangle edges do not necessarily need to be in parallel with the test image edges, so transformation is really needed in some cases.

Figure 7: License Plate

Character Recognition

Figure 8 shows the set of reference characters for German license plates which are available as images for each character. In the code below the Figure, we set the directory with the reference character images to the variable feletterspath.

Figure 8: Set of Characters for Reference

The main function from the code below is get_prediction which is called with a transformed and grayscaled license plate image as an input parameter.

First the function get_prediction finds the contours of the license plate image. The contours are forwarded to the _get_rectangles_around_letters function. It checks all contours’ height and width sizes with the function _check_dimensions. It simply figures out, if a contour has a similar height as the license plate image height and a similar width as one eighth of the license plate image width. If this is the case, there is a high probability that the contour is a character. The function _get_rectangles_around_letters sorts the contours from left to right using the sort function and moves the contours into the list rectangles. The contours in the list have now a high probability that they are characters.

feletterspath="/home/.../feletters/"

def get_prediction(img):
 
    img_dimensions = (660, 136)
    img = cv2.resize(img, (img_dimensions))

    contours,_ = cv2.findContours(img, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE)
    
    rectangles = _get_rectangles_around_letters(contours, img_dimensions)
    
    if len(rectangles) < 3:
        return
    
    letter_imgs = _get_letter_imgs(img, rectangles)
    letters = _get_letter_predictions(letter_imgs)
    letters = _add_space_characters(letters, rectangles)
    return letters


def _check_dimensions(img_dimensions, rectangle):

    img_width, img_height = img_dimensions
    (x,y,w,h) = rectangle
    letter_min_width, letter_max_width = img_width / 17, img_width / 8
    letter_min_height, letter_max_height = img_height / 2, img_height 
    rectangle_within_dimensions = (w > letter_min_width and w < letter_max_width) \
        and (h > letter_min_height and h < letter_max_height)

    return rectangle_within_dimensions


def _get_rectangles_around_letters(contours, img_dimensions):

    rectangles = []

    for contour in contours:

        rectangle = cv2.boundingRect(contour)

        has_letter_dimensions = _check_dimensions(img_dimensions, rectangle)
        if has_letter_dimensions:
            rectangles.append(rectangle)

    rectangles.sort(key=lambda tup: tup[0])

    return rectangles

The function get_prediction from the code above is calling the function _get_letter_imgs from the code below to extract the character images from the input license plate image and returns a list. The function _get_letter_predictions iterates through this list and executes the function _match_fe_letter. The function _match_fe_letter is iterating through the set of license plate reference characters (shown in Figure 8) and applies the OpenCV matchTemplate method after the images are resized to the same shapes. OpenCV’s matchTemplate returns value indicating the similarity of the license plate character with the reference character. The license plate character with the highest similarity is chosen to be the matched character. Finally the function _get_letter_imgs returns a list of matched characters.

Figure 7 shows a space between the “BL” and “LU” and a space between “LU” and “613”. The function _add_space_characters adds a blank character between the matched characters, if the spaces of the character images exceed a certain threshold (20 in the code below).

def _get_letter_imgs(img, rectangles):

    letter_imgs = []

    for rect in rectangles:
        (x,y,w,h) = rect
        current_letter = img[y:y+h, x:x+w]
        letter_imgs.append(current_letter)
        
    return letter_imgs


def _get_letter_predictions(letter_imgs):

    letters = ""

    for letter_img in letter_imgs:
        prediction = _match_fe_letter(letter_img)
        letters += prediction

    return letters

def _add_space_characters(letters, rectangles):

    space_counter = 0
    
    for n,_ in enumerate(rectangles):
        (x1,_,w1,_) = rectangles[n]
        (x2,_,_,_) = rectangles[n+1]
        distance = x2-(x1+w1)
        
        if distance > 20:
            index = n + 1 + space_counter
            space_counter += 1
            letters = letters[:index] + ' ' + letters[index:]
        
        if n == len(rectangles)-2:
            break
            
    return letters


def _match_fe_letter(img):

    fe_letter_dir = feletterspath

    similarities = []

    for template_img in sorted(os.listdir(fe_letter_dir)):
        template = cv2.imread(os.path.join(fe_letter_dir, template_img), cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, (template.shape[1], template.shape[0]))
        similarity = cv2.matchTemplate(img,template,cv2.TM_CCOEFF_NORMED)[0][0]
        similarities.append(similarity)
    
    letter_array = [os.path.splitext(letter)[0]
        for letter in sorted(os.listdir(fe_letter_dir))]

    letter = letter_array[similarities.index(max(similarities))]

    return letter

The function get_prediction is called several times with differently processed input images, see code below. The code calls OpenCV’s threshold with a range of thresholds and feeds the images into the method get_prediction. The result is appended to the results list.

results = []

for i in range(50,200,10):
    _,prediction_img = cv2.threshold(gray_img, i, 255, cv2.THRESH_BINARY)
    prediction = get_prediction(prediction_img)
    if prediction is not None:
        results.append((i,prediction))

Figure 9 shows the list of results from the license plate’s input image. You can see that the code correctly predicted the license plate six times. Here the majority of the same predictions can be used as a final predicted output.

Figure 9: Result List

Conclusion

In this blog I described a gate opening system designed by students from the class Design Cyber Physical Systems. The idea was, that a car approaches the gate, and a camera system takes images from the car including its license plate. We trained a neural network to receive a mask indicating the license plate’s position on the image. The application extracted the license plate with the mask from the image and applied character recognition supplied by OpenCV.

The application was actually distributed over two computers. One computer (raspberry pi) took images and controlled the output relay, the other computer calculated the mask image with a neural network. Actually we did not open a gate as described in the introduction. We connected a signal light to a relay which was controlled by the raspberry pi. The communication between both computers was realized by a REST interface.

The character recognition only worked well, if we fed the license plate image several times with different thresholds. A majority vote was taken to choose the recognized license plate number.

Acknowledgement

Thanks to Jonas Acker, Marc Bitzer and Thomas Schöller for participating at the class Design Cyber Physical Systems and providing the code which was the result of the project from this class.

Also special thanks to the University of Applied Science Albstadt-Sigmaringen offering a classroom and appliances to enable this research.

Fruit Recognition on a Raspberry Pi

As a instructor I offer a class, in which I let master students choose a topic for practical work during a semester. I usually give them a rough description, which included last time a raspberry pi, a camera, and a neural network.

Some students have chosen to work on fruit recognition with a camera. So the scenario is the following: The camera is connected to a raspberry pi. The camera observes a clean table. As soon as a user puts a fruit onto the table, the user can hit a button on a shield attached on the raspberry pi. The button triggers the camera to take an image. Then, the image is fed into a trained neural network for image categorization. The category was then fed into a speech synthesizer to speak out the category.

The type of neural network my students and I used is a multi-categorical neuronal network. So the goal was to feed the neuronal network with image and a category will come out as an output.

Preparing the Data

In the beginning we chose fruit images from a database which is available on github. You find it here. It had about 120 different categories of fruits and vegetables available. The problem we find with these images are, that the fruits and vegetables seemed to be perfect looking which is in reality not the case. The variation of fruit images within one category also seemed to be very limited. On the one hand, they do have many images within each category, on the other hand it looks like each image from one category only comes from a perfect fruit photographed in different positions.

The fruits fill out the complete image, as well. When you photograph a fruit from a table, this is in general not the case. The left part of Figure 1 shows an orange which fills in only part of the image.

What is more, the background of the images from the database is extremely bright. This is not quite a real life background, which we find is much darker when you take pictures from inside a building. In Figure 2 you can see two different backgrounds which are surfaces from two different tables. The backgrounds do have relatively low brightness.

Cropping the images

The first task was to prepare the data for training the neural network. We decided to crop the images to the size of the fruits, so we receive some kind of standardization of the images. Below you find the code which crops the images to the size of the fruit. In this case we have the fruit images inside the addfolder. Inside the addfolder we first have two more directories, Testing and Training. Below these directories you find the directories for each fruit. We limit the number of fruits to six. The fruits we use are listed in dirlist, which are also the directory names.

The code is iterating through the Testing and Training directories and the fruit directories in dirlist and loads in every image with the opencv function imread. It converts the loaded image to a grayscale image and filters it with the opencv threshold function. After this we apply the findContours function which returns a list of contours of the image. The second largest contour (the largest contour has the size of the image itself) is taken and the width and height information of the contour is retrieved. The second largest contour is the fruit portion on the image. The application copies a square at the position of the second largest contour from the original image, resizes it to 100×100 pixels and saves it into a new directory destfolder.

srcfolder = '/home/inf/Bilder/Scale/orig/'
destfolder = '/home/inf/Bilder/Scale/cropped/'
addfolder = '/home/inf/Bilder/Scale/added/'
processedfolder = '/home/inf/Bilder/Scale/processed/'

dirtraintest = ['Testing', 'Training']
dirlist = ['Apfel','Gurke','Kartoffel','Orange','Tomate','Zwiebel']

count = 0
pattern = "*.jpg"
img_size = (100,100)

for traintest in dirtraintest:
    for fruit in dirlist:
        count = 0
        for file in glob.glob(os.path.join(addfolder, traintest, fruit, pattern)):
            im = cv2.imread(file)
            imgray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
            ret, thresh = cv2.threshold(imgray, 127, 200, 0)
            contours, hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
            if len(contours) > 1:
                cnt = sorted(contours, key=cv2.contourArea)
                x, y, w, h = cv2.boundingRect(cnt[-2])
                w = max((h, w))
                h = w
                crop_img = im[y:y+h, x:x+w]
            im = cv2.resize(crop_img, img_size)
            cv2.imwrite(os.path.join(destfolder, traintest, fruit, str("cropped_img_"+str(count)+".jpg")), im)
            count += 1

Figure 1 shows how the application crops an image of an orange. On the left side, the orange fills out only part of the image. On the right side, the orange fills out the complete image.

Figure 1: Original Image and Cropped Image

Changing the backgrounds

Due to the extreme bright background of the images from the database we came to the decision to fill in new backgrounds on top of the bright ones. In Figure 2, you can see two different table surfaces, taken by the camera we used.

Figure 2: Backgrounds

The code below shows how each image from the directory structure (which I explained above) is loaded into the variable pixels with the opencv imread function. Each pixel on each layer (RGB) of the image is checked, if a threshold of brightness has been reached. We assume that a pixel exceeding a certain brightness threshold is a background pixel (which is not always the case). The application then replaces the pixel with a pixel from a background image shown in Figure 2. It saves the new image to the directory processedfolder.

background = cv2.imread("background.jpg")

bg = np.zeros((img_size[0], img_size[1],3), np.uint8)
bgData = np.zeros((img_size[0], img_size[1],3), np.uint8)

bg = cv2.resize(background, img_size)
bgData = bg.copy()

threshold = (100, 100, 100)

for traintest in dirtraintest:
    for fruit in dirlist:
        count = 0
        for name in glob.glob(os.path.join(destfolder, traintest, fruit, pattern)):
            pixels = cv2.imread(os.path.join(destfolder, traintest, fruit, name))
            pixelsData = pixels.copy()

            for i in range(pixels.shape[0]): # for every pixel:
                for j in range(pixels.shape[1]):
                    if pixelsData[i, j][0] >= threshold[0] and pixelsData[i, j][1] >= threshold[1] and pixelsData[i, j][2] >= threshold[2]:
                        pixelsData[i, j] = bgData[i, j]
            cv2.imwrite(os.path.join(processedfolder, traintest, fruit, str("processed_img_"+str(count)+".jpg")), pixelsData)
            count += 1

Figure 3 shows the output of two images from the code above. It shows the same orange with two different backgrounds.

Figure 3: Orange with two different Backgrounds

Training the Model

Below the code of a neural network model. It consists of four convolutional layers. The number of filters is increased with each layer. After each convolutional layer there is a max pooling layer to reduce the image size for the input of the following layer. A flatten layer follows and is fed into a dense layer. Finally there is another dense layer with six neurons. This is the number of categories we have. Each layer uses the relu activation function. In the last layer however we use the softmax activation function. The reason for softmax, and not sigmoid, is, that we expect only one category from the six categories to be true for a given input image. This can be represented by the highest number calculated from the six output neurons. For optimization, we use stochastic gradient descent method.

model = Sequential()
model.add(Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same', input_shape=input_shape))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.1))
model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.1))
model.add(Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.1))
model.add(Flatten())
model.add(Dense(256, activation='relu', kernel_initializer='he_uniform'))
model.add(Dropout(0.1))
model.add(Dense(6, activation='softmax'))

opt = SGD(lr=0.001, momentum=0.9)
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

We load in all training and validation images from the directory train_path and valid_path with the Keras ImageDataGenerator. By doing this the ImageDataGenerator rescales the images and augment the images by shifting and flipping. The training and validation images from the directories train_path and valid_path are moved into the lists train_it and valid_it. The method flow_from_directory makes this task easy since it considers the directory structure below the directories train_path and valid_path, as well. In our case, we have the directories Apfel, Gurke, Kartoffel, Orange, Tomate, Zwiebel below of train_path and valid_path. In each of these directories you find the corresponding images (such all apple images in directory Apfel, all cucumber images in directory Gurke etc.).

train_datagen = ImageDataGenerator(rescale=1.0/255.0,width_shift_range=0.1, height_shift_range=0.1, horizontal_flip=True)
test_datagen = ImageDataGenerator(rescale=1.0/255.0)

train_it = train_datagen.flow_from_directory(train_path,class_mode='categorical', batch_size=64, target_size=image_size)
valid_it = test_datagen.flow_from_directory(valid_path,class_mode='categorical', batch_size=64, target_size=image_size)

The training is started with the Keras fit_generator command. It uses the lists train_it and valid_it as inputs. We defined a callback function to produce checkpoints from the neural network weights, each time the training shows some improvement concerning validation loss.

callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ModelCheckpoint('modelmulticat.h5', verbose=1, save_best_only=True, save_weights_only=True)
]

history = model.fit_generator(train_it, steps_per_epoch=len(train_it),validation_data=valid_it, validation_steps=len(valid_it), epochs=10, callbacks=callbacks, verbose=1)

_, acc = model.evaluate_generator(valid_it, steps=len(valid_it), verbose=0)
print('> %.3f' % (acc * 100.0))

model_json = model.to_json()
with open("modelmulticat.json", "w") as json_file:
    json_file.write(model_json)

Finally the structure of the trained model is saved to a json file.

The training time with this model is about three minutes on a NVIDIA graphics card. We use about 6000 images for training and 2000 images for validation, altogether. The validation accuracy was 96% which was above the accuracy, which shows a little underfitting.

Testing the Model

We tested the model with the code below. First, we loaded the image in the variable img with the opencv function imread read. Right after this, we have to take care of the image layers. The way opencv handles the image layers is different from the way Keras with its predict method does. They have the Red and the Blue layers switched. For this reason, we have to apply the cvtColor method, which switches the Red and Blue layers. The image is then normalized by dividing its pixels values with 255. Finally the prediction method is used to predict the image. Figure 4 shows an example of an image for input, which is printed out by the matplotlib function imshow. The method predict returns a probability vector predictions. The index with the highest value of the vector corresponds to the category. The category can be retrieved from the class_indices list.

img = cv2.imread(os.path.join(valid_path,"Apfel/cropped_img_592.jpg"),1)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
imshow(img)   
img = np.array(img, dtype=np.float32)
img *= 1.0/255.0
predictions = model.predict([[img]])
print(predictions)
result = np.where(predictions[0] == np.amax(predictions[0]))
assert len(result)==1
print(list(valid_it.class_indices)[result[0][0]])

We tested a few times with different image and saw that the prediction delivered pretty good results.

Figure 4: Prediction Image

The Raspberry Pi application

The setup of the experiment is shown in Figure 5. The raspberry pi 4, power supply and a socket are mounted on a top-hat rail. On the raspberry pi you see a piface shield attached. The shield had to be mechanically prepared to fit on a raspberry pi 4. The shield provides buttons in case it is needed. Additionally we have a relay and a power socket. The relay can be triggered by the piface, so the relay applies 230V to the socket. On top of the construction you find an usb camera.

Figure 5: Experiment Setup

We defined a function getCrop, see code below, which crops the image to the size of the portion of the fruit. This procedure was already explained above. Here we introduced the variable threshset, where the user can modify the threshold value of the opencv threshold method using keys. This is explained later.

threshset = 100

def getCrop(im):
    global threshset
    imgray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
    ret, thresh = cv2.threshold(imgray, threshset, 255, 0)
    contours, hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    if len(contours) >= 1:
        cnts = sorted(contours, key=cv2.contourArea, reverse=True)    
        for cnt in cnts:
            x, y, w, h = cv2.boundingRect(cnt)
            if w > im.shape[0]*20//100 and w < im.shape[0]*95//100:
                if h > im.shape[1]*20//100 and h < im.shape[1]*95//100:
                    w = max((h, w))
                    h = w
                    return x,y,w
    return 0,0,0

In the beginning we faced the problem that the neural network did not predict very well due to too few training images. Therefore we introduced a function to save easily badly predicted images. The name of the function is saveimg. It simply saves an image img to a directory with a name containing the parameters dircat and fruit. The image name also contains the date and the time.

def saveimg(img, dircat, fruit):
    global croppedfolder
    now = datetime.now()
    dt_string = now.strftime("%d_%m_%Y_%H_%M_%S")
    resized =  np.zeros((image_size[0], image_size[1],3), np.uint8)
    resized = cv2.resize(img, image_size, interpolation = cv2.INTER_AREA)
    cv2.imwrite(os.path.join(croppedfolder, dircat, fruit, str("img_"+dt_string+".jpg")), resized)

Below you find the raspberry pi application code. In the beginning it sets up the opencv video feature. Inside the while loop, an image frame from the usb camera is taken, which is then copied into the image objectfr. The function getCrop is used to get the fruit portion of the image and a rectangle is drawn around the fruit portion. The function putText writes the current value of threshset into the image objectfr as well. The application then shows the modified image on a display, see Figure 6. The opencv method waitkey checks for a pressed key. In case a key was pressed, code depending on the key will be executed.

cam = cv2.VideoCapture(0)
cv2.namedWindow("object")

while True:
    ret, frame = cam.read()
    if not ret:
        print("cam.read something wrong")
        break
    objectfr = frame.copy()
    x,y,w = getCrop(objectfr)
    cv2.rectangle(objectfr, (x,y), (x+w,y+w), (0,255,0), 1)
    cv2.putText(objectfr, "thresh: {}".format(threshset), (10,30),  cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA)
    cv2.imshow("object", objectfr)
    if not ret:
        break
    k = cv2.waitKey(1)
    if k & 0xFF == ord('q') :
        break
    elif k & 0xFF == ord('n') :
        resized =  np.zeros((image_size[0], image_size[1],3), np.uint8)
        resized = cv2.resize(frame[y:y+w,x:x+w,:], image_size, interpolation = cv2.INTER_AREA) 
        cv2.imwrite("checkpic.jpg",resized)
        resized = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
        resized = np.array(resized, dtype=np.float32)
        resized *= 1.0/255.0
        predictions = model.predict([[resized]])
        print(predictions)
        result = np.where(predictions[0] == np.amax(predictions[0]))
        assert len(result)==1
        print(result[0][0])
        print(list(valid_it.class_indices)[result[0][0]])
        os.system("espeak -vde {}".format(list(valid_it.class_indices)[result[0][0]]))
    elif k & 0xFF == ord('a'):
        saveimg(frame[y:y+w,x:x+w,:], "Training", "Apfel")
        img_counter += 1
    elif k & 0xFF == ord('z'):
        saveimg(frame[y:y+w,x:x+w,:], "Training", "Zwiebel")
        img_counter += 1
    elif k & 0xFF == ord('o'):
        saveimg(frame[y:y+w,x:x+w,:], "Training", "Orange")
        img_counter += 1                
    elif k & 0xFF == ord('k'):
        saveimg(frame[y:y+w,x:x+w,:], "Training", "Kartoffel")
        img_counter += 1
    elif k & 0xFF == ord('+'):
        threshset += 5
        if threshset > 255:
            threshset = 255
    elif k & 0xFF == ord('-'):
        threshset -= 5
        if threshset < 0:
            threshset = 0
        
        
cam.release()
cv2.destroyAllWindows()

If the key ‘q’ is pressed, than the application stops. If the key ‘n’ is pressed, the image inside the rectangle is taken and the category is predicted with the Keras predict method. The string is handed over to the espeak application which speaks out the category on the speaker attached on the raspberry pi. The keys ‘a’, ‘z’, ‘o’, ‘k’ execute the saveimg function with different parameters. The purpose of these keys is, that the user can save an image, in case there is a bad prediction. Next time, the model is trained, the saved image will be included in the training data. At last we have the ‘+’ and ‘-‘ keys, which modify the threshset value. The effect will be, that the rectangle (Figure 6, green rectangle) is enlarged or downsized due to the shadow on the background.

Figure 6: Displayed Image

Conclusion

The application works amazingly well with few fruits to predict considering the relative low number of training data. In the beginning we had to retrain the model a couple of times with newly generated images using the application keys described above.

As soon as we take e.g. an apple with different colors, there is a high chance that the prediction fails. In such cases we have take more images and retrain again.

Acknowledgement

Thanks to Carmen Furch and Armin Weisser providing the data preparation code and the raspberry pi application.

Also special thanks to the University of Applied Science Albstadt-Sigmaringen offering a classroom and appliances to enable this research.

Centromere Position on a Chromosome Image using a Neural Network

Chromosomes have one short arm and one long arm. The centromere sits in between and links both arms together. Biologists find it convenient that an application can spot automatically the position of the centromere on a chromosome image. In general for image processing, it is useful for an application to know the centromere position to simplify the classification of the chromosome type.

With this project we want to show how an application can get the centromere positions by using a neuronal network. In order to train the neuronal network, we need sufficient training data. We show here, how we created the training data. A position in an image is a coordinate with two numbers. The application must therefore use an neuronal network with an regression layer as an output. In this post we show what kind of neuronal network we used for retrieving a position from an image.

Creating the Training Data

Previously we created with a tool around 3000 images from several complete chromosome images. We do not go much into detail about this tool. The tool works in a way that it loads in and shows a complete chromosome image with its 46 chromosomes and as an user we can select with the mouse a square on this image. The content of the square is then saved as a 128×128 chromosome image and as a 128×128 telomere image. Figure 1 shows an example of both images. We have created around 3000 chromosome and telomere images from different positions.

Figure 1: Chromosome Image and Telomere Image

Each time we save the chromosome and telomere images, the application updates a csv file with the name of the chromosome (chrname) and the name of the telomere (telname) using the write function of the code below. It uses the library pandas to concat rows to the end of a csv file with the name f_name.

def write(chrname, telname, x, y): 
  
    if isfile(f_name): 
  
        df = pd.read_csv(f_name, index_col = 0) 
        data = [{'chr': chrname, 'tel': telname, 'x': x, 'y':y}] 
        latest = pd.DataFrame(data) 
        df = pd.concat((df, latest), ignore_index = True, sort = False) 
    else: 
        data = [{'chr': chrname, 'tel': telname, 'x': x, 'y':y}] 
        df = pd.DataFrame(data) 

    df.to_csv(f_name) 

In the code above, you can see that a x and a y value is stored into the csv file, as well. This is the position of the centromere of the chromosome on the chromosome image. At this point of time, the position is not known yet. We need a tool, where we can click on each image to mark the centromere position. The code of the tool is shown below. There are two parts. The first part is the callback function click. It is called as soon as the user of the application presses a mouse button or moves the mouse. If the left mouse button is pressed, then the actual mouse position on the conc window is moved to the variable refPt. The second part of the tool loads in the a chromosome image from a directory chrdestpath and a telomere image from a directory teldestpath into a window named conc. The function makecolor (this function is described below) adds both images together to one image. The user can select with the mouse the centromere position and a cross appears on the clicking position, Figure 2. The application stores the position refPt by pressing the key “s” into the pandas data frame df. After this, the application loads in the next chromosome image from the directory chrdestpath and the next telomere image from the directory teldestpath.

refPt = (0,0)
mouseevent = False
def click(event,x,y,flags,param):
    global refPt
    global mouseevent
    if event == cv2.EVENT_LBUTTONDOWN:
        refPt = (x,y)
        mouseevent = True

cv2.namedWindow('conc')
cv2.setMouseCallback("conc", click)

theEnd = False
theNext = False

img_i=0
imgstart = 0
assert imgstart < imgcount

df = pd.read_csv(f_name, index_col = 0) 

for index, row in df.iterrows():
 
    if img_i < imgstart:
        img_i = img_i + 1
        continue
        
    chrtest = cv2.imread("{}{}".format(chrdestpath,row["chr"]),1)
    teltest = cv2.imread("{}{}".format(teldestpath,row["tel"]),1)
    
    conc = makecolor(chrtest, teltest)
    concresized = np.zeros((conc.shape[0]*2, conc.shape[1]*2,3), np.uint8)
    concresized = cv2.resize(conc, (conc.shape[0]*2,conc.shape[1]*2), interpolation = cv2.INTER_AREA)
    
    refPt = (row["y"],row["x"])
    cv2.putText(concresized,row["chr"], (2,12), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1, cv2.LINE_AA)

    while True:
        cv2.imshow('conc',concresized)
        key = cv2.waitKey(1)
        if mouseevent == True:
            print(refPt[0], refPt[1])
            concresized = cross(concresized, refPt[0], refPt[1], (255,255,255))
            mouseevent = False
        if key & 0xFF == ord("q") :
            theEnd = True
            break
        if key & 0xFF == ord("s") :
            df.loc[df["chr"] == row["chr"], "x"] = refPt[1]//2
            df.loc[df["chr"] == row["chr"], "y"] = refPt[0]//2
            theNext = True
            break
        if key & 0xFF == ord("n") :
            theNext = True
            break
    if theEnd == True:
        break
    if theNext == True:
        theNext = False
           
df.to_csv(f_name) 
cv2.destroyAllWindows()

Figure 2 shows the cross added to the centromere position selected by the user. This procedure was done around 3000 times on chromosome and telomere images, so the output was a csv file with 3000 chromosome image names, telomere image names, and centromere positions.

Figure 2: Centromere Postion on a Chromosome Image

Augmenting the Data

In general 3000 images are too few images to train a neuronal network, so we augmented the images to have more training data. This was done by mirrowing all chromosome images (and its centromere positions) on the horizontal axis and on the vertical axis. This increased the number of images to 12000. The code below shows the load_data function to load the training data or validation_data into arrays

def load_data(csvname, chrdatapathname, teldatapathname):
    X_train = []
    y_train = []
    
    assert isfile(csvname) == True
    df = pd.read_csv(csvname, index_col = 0) 
    for index, row in df.iterrows():
                          
        chrname = "{}{}".format(chrdatapathname,row["chr"])
        telname = "{}{}".format(teldatapathname,row["tel"])
    
        chrimg = cv2.imread(chrname,1)
        telimg = cv2.imread(telname,1)                  
                 
        X_train.append(makecolor(chrimg, telimg))
        y_train.append((row['x'],row['y']))
    return X_train, y_train

In the code above you find a makecolor function. makecolor copies the grayscale images of the chromosome into the green layer of a new color image and the telomere image into the red layer of the same color image, see code of the function makecolor below.

def makecolor(chromo, telo):

    chromogray = cv2.cvtColor(chromo, cv2.COLOR_BGR2GRAY)
    telogray = cv2.cvtColor(telo, cv2.COLOR_BGR2GRAY)
    
    imgret = np.zeros((imgsize, imgsize,3), np.uint8)
    
    imgret[0:imgsize, 0:imgsize,1] = chromogray
    imgret[0:imgsize, 0:imgsize,0] = telogray
    
    return imgret

Below the function code mirrowdata to flip the images horizontally or vertically. It uses the parameter flip to control the flipping of the image and its centromere position.

 def mirrowdata(data, target, flip=0): 
    xdata = []
    ytarget = []

    for picture in data:
        xdata.append(cv2.flip(picture, flip))
        
    for point in target:
        if flip == 0:
            ytarget.append((imgsize-point[0],point[1]))
        if flip == 1:
            ytarget.append((point[0],imgsize-point[1]))
        if flip == -1:
            ytarget.append((imgsize-point[0],imgsize-point[1]))
    
    return  xdata, ytarget

The following code loads in the training data into the array train_data and the array train_target. train_data contains color images of the chromosomes and telomeres and train_target contains the centromere positions. The mirrowdata function is applied twice on the data with different flip parameter settings. After this, the data is converted to numpy arrays. This needs to be done to be able to normalize the images with the mean function and the standard deviation function. This is done for 10000 images among the 12000 images for the training data. The same is done with the remaining 2000 images for the validation data.

train_data, train_target = load_data(csvtrainname, chrtrainpath, teltrainpath)
train_mirrow_data, train_mirrow_target = mirrowdata(train_data, train_target, 0)
train_data = train_data + train_mirrow_data
train_target = train_target + train_mirrow_target
train_mirrow_data, train_mirrow_target = mirrowdata(train_data, train_target, 1)
train_data = train_data + train_mirrow_data
train_target = train_target + train_mirrow_target

train_data = np.array(train_data, dtype=np.float32)
train_target = np.array(train_target, dtype=np.float32)

train_data -= train_data.mean()
train_data /= train_data.std()

Modeling and Training the Neuronal Network

Since we have images we want to feed into the neural network, we decided to use a neuronal network with convolution layers. We started with a Layer having 32 filters. As input for training data we need images with size imgsize, which is in our case 128. After each convolution layer we added the max pooling function with pool_size=(2,2) which reduces the size of the input data by half. The output is fed into the next layer. Altogether we have four convolution layers. The number of filters, we increase after each layer. After the fourth layer we flatten the network and feed this into the first dense layer. Then we feed the output into the second dense layer having only two neurons. The activation function is a linear function. This means, we will receive two float values, which is supposed to be the position of the centromere. As a loss function we decided to use the mean_absolute_percentage_error.

model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01), padding='same', input_shape=(imgsize, imgsize, 3)))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01), padding='same'))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01), padding='same'))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01), padding='same'))
model.add(MaxPooling2D((2, 2)))
model.add(Flatten())
model.add(Dense(128, activation='relu', kernel_initializer='he_uniform'))
model.add(Dropout(0.1))
model.add(Dense(2, activation='linear'))
opt = Adam(lr=1e-3, decay=1e-3 / 200)
model.compile(loss="mean_absolute_percentage_error", optimizer=opt, metrics=['accuracy'])

We start the training with the fit method. The input parameters are the list of colored and normalized chromosome images (train_data), the list of centromere positions (train_target), and the validation data (valid_data, valid_target). A callback function was defined to stop the training as soon as there is no progress seen. Also checkpoints are saved automatically, e.g. if there is progress during the training.

callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ModelCheckpoint('modelregr.h5', verbose=1, save_best_only=True, save_weights_only=True)
]

model.fit(train_data, train_target, batch_size=20, nb_epoch=50, callbacks=callbacks, verbose=1, validation_data=(valid_data, valid_target) )

The training took around five minutes on a NVIDIA 2070 graphics card. The accuracy is 0.9462 and the validation accuracy is 0.9258. This shows a small overfitting. The loss function shows the same overfitting result.

Testing

We kept a few chromosome images and telomere images aside for testing and predicting. The images were stored in a test_data array and normalized before prediction. The prediction was done with the following code.

predictions_test = model.predict(test_data, batch_size=50, verbose=1)

prediction_test contains now all predicted centromere positions. Figure 3 shows the positions added to the chromosome images. We can see that the position of the cross is pretty close to the centromere. However there are deviations.

Figure 3: Predicted centromere positions

For displaying the chromosomes as shown as in Figure 3 we use the following showpics function. Note in case you want to use this code, you have to be aware that the input images may not be normalized, otherwise you see see a black image.

def showpics(data, target, firstpics=0, lastpics=8):
    chrtail=[]
    pnttail=[]
    columns = 4
    print(data[0].shape)
    for i in range(firstpics, lastpics):
        chrtail.append(data[i])
        pnttail.append(target[i])
    rows = (lastpics-firstpics)//columns
    fig=figure(figsize=(16, 4*rows))
    for i in range(columns*rows):
        point = pnttail[i]
        fig.add_subplot(rows, columns, i+1)
        pic = np.zeros((chrtail[i].shape[0], chrtail[i].shape[1],3), np.uint8)
        pic[0:pic.shape[0], 0:pic.shape[1], 0] = chrtail[i][0:pic.shape[0], 0:pic.shape[1], 0]
        pic[0:pic.shape[0], 0:pic.shape[1], 1] = chrtail[i][0:pic.shape[0], 0:pic.shape[1], 1]
        pic[0:pic.shape[0], 0:pic.shape[1], 2] = chrtail[i][0:pic.shape[0], 0:pic.shape[1], 2]
        imshow(cross(pic, int(point[1]), int(point[0]), (255,255,255)))    

Conclusion

The intention of this project was to show how to use linear regression as the last layer of the neuronal network. We wanted to get a position (coordinates) from an image.

Firstly we marked about 3000 centromere positions of chromosomes and telomere images with a tool we created. Then we augmented the data to increase the data to 12000 images. We augmented the data by horizontal and vertical flipping.

Secondly we trained a multilayer convolutional neural network with four convolutional layers and two dense layers. The last dense layer has two neurons. One for each coordinate.

The prediction result was fairly good, considering the little effort we used to optimize the model. On Figure 3 we still can see that the centromere position is not always hit on the right spot. We expect improvement after we will add more data and optimize the model.