En este post seguiremos donde lo dejamos sobre las Convolutional Neural Networks. Si os acordáis del anterior tema, nos centramos en el funcionamiento de la capa más importante: la Convolucion.
¿Pero eso es todo? ¿Solo existe esta famosa capa en las CNN? No, otra de las capas que también se usa mucho junto a la convolución es la capa de pooling.
Capa de Pooling:
La capa de pooling es una capa bastante sencilla y está la vamos a encontrar prácticamente en todas las arquitecturas de CNN. Esta capa, actuá de la misma forma que lo hace la convolución (cogiendo trozos de la entrada) pero en este caso realiza una operación que no tiene parámetros (siempre es la misma operación). Por lo tanto no hay que aprender ningún peso a corregir.
Veamos la siguiente figura para ver como actúa:
Como veis por ahora hace un mecanismo muy similar a la capa de convolución… ¿Entonces que la hace distinta? La forma en la que mezcla los datos y en que no aprende ningún peso.
Existen básicamente dos formas distintas en las que encontraremos esta capa:
- Max-pooling: En este caso la operación que se realiza será la de coger el máximo.
- Avg-pooling: En este caso la operación que se realiza será la media.
¿Que beneficios tiene la capa de pooling?
- Aporta invariancia a traslación a la capa de Convolución.
- Es muy rápida de computar.
- Reduce la dimensionalidad de la entrada.
¿Las operaciones son bastantes simples no? Veamos que diferencias nos puede ofrecer cada una de estas operaciones.
Max-pooling | Avg-pooling |
|
|
Hasta aquí podríamos pensar que es bastante simple, pues bien la implementación no lo es ya tanto… Trabajar con algoritmos que aplican una «sliding windows» acostumbra a ser sinónimo de complejidad. Eso se debe a que como estas ventanas se solapan entre sí existen neuronas que pueden ser contribuir en más de una ventana. Provocando que la derivada sea la suma de todas estas contribuciones.
Max-pooling:
En este apartado nos vamos a centrar en el forward y backward de la operación de max-pooling.
El forward es bastante simple, pero no nos es suficiente con coger el valor máximo porque de ser así ¿Que neurona es la que deberemos corregir en el backward?
Veamos en pseudo-código y para una ventana en concreto:
En el caso del forward:
Vamos a hacer dos cosas:
- Poner en la salida (y) el valor máximo.
- Guardarnos la posición de esa neurona para que podamos corregir los pesos después.
1 2 3 4 5 6 7 8 9 |
def windowForwardMaxPooling(x): max_v = - Inf max_i = [-1, -1] for i in range(x.w): for j in range(x.h): if max_v <= x[i, j]: max_v = x[i, j] max_i = [i, j] return max_v, max_i |
En el caso del backward:
Volveremos a usar la posición de la neurona activada y solo le propagaremos las correcciones a esta.
Recordar que la derivada del máximo (y = max(x)) es:
\frac{\partial y}{\partial x_i} = \begin{cases}
0, & si\ i \neq argmax(x) \\
1, & si\ i = argmax(x)
\end{cases}
1 2 3 4 |
def windowBackwardMaxPooling(x, dout, max_i): dx[:x.w, :x.h] = 0 dx[max_i] = dout return dx |
Avg-pooling:
En este apartado nos vamos a centrar en el forward y backward de la operación de avg-pooling.
En este caso todas las neuronas contribuyen por igual.
En el caso del forward:
Solamente deberemos realizar la media de todos los datos.
1 2 3 4 5 6 |
def windowAvgPooling(x): acc = 0 for i in range(x.w): for j in range(x.h): acc += x[i, j] return acc / (x.w*x.h) |
En el caso del backward:
El gradiente pasará por todos los datos, lo único que hay que tener en cuenta es que la derivada de y = \frac{1}{N} \sum_i x_i respecto x_i es \frac{1}{N}.
1 2 3 |
def windowBackwardAvgPooling(x, dout, max_i): dx[:x.w, :x.h] = dout / (x.w*x.h) return dx |
Código completo:
Bien ahora es hora de coger todas las anteriores ideas y pasarlas al codigo que nos permitirá usarlo en nuestro framework. Por un tema de optimización, vamos a separar el código en Python y Cython. Como siempre la parte Python hará de wrapper y Cython será el codigo compilado de forma que nos irá «algo» más rápido.
Python:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
from .layer import Layer from ..backend.initializer import Initializer import numpy as np from .cython import pooling2d class Pooling2D(Layer): def __init__(self, node, type_pooling='max', pool_size=(2,2), stride=(1, 1), padding='valid', params=None): if params is None: params = {} self.type_pooling = type_pooling self.pool_size = tuple(pool_size) if isinstance(stride, (list, tuple)): self.stride = tuple(stride) else: self.stride = (stride, stride) self.padding = padding if self.padding == 'valid': self.padding_size = (0, 0) elif self.padding == 'same': self.padding_size = (self.pool_size[0] // 2, self.pool_size[1] // 2) super(Pooling2D, self).__init__(node, params=params) def computeSize(self): super(Pooling2D, self).computeSize() return (self.in_size[0][0] - self.pool_size[0] + 2*self.padding_size[0])//self.stride[0] + 1, \ (self.in_size[0][1] - self.pool_size[1] + 2*self.padding_size[1])//self.stride[1] + 1, \ self.in_size[0][-1] def compile(self): super(Pooling2D, self).compile() if len(self.in_size[0]) < 2: self.num_dim = 1 else: self.num_dim = self.in_size[0][2] def forward(self, inputs): super(Pooling2D, self).forward(inputs) input = np.pad(inputs[0], [(0, 0), (self.padding_size[0], self.padding_size[0]), (self.padding_size[1], self.padding_size[1]), (0, 0)], mode='constant') if self.type_pooling == 'max': out, self.values.mask = pooling2d.nb_forward_max(input, self.pool_size, self.stride) elif self.type_pooling == 'mean': out = pooling2d.nb_forward_mean(input, self.pool_size, self.stride) return out def derivatives(self, doutput): if self.type_pooling == 'max': dx = pooling2d.nb_derivatives_max(doutput, tuple(self.in_size[0]), self.values.mask, self.stride) elif self.type_pooling == 'mean': dx = pooling2d.nb_derivatives_mean(doutput, tuple(self.in_size[0]), self.pool_size, self.stride) return dx[:, self.padding_size[0]:(self.in_size[0][0] - self.padding_size[0]), self.padding_size[1]:(self.in_size[0][1] - self.padding_size[1]), :] def save(self, h5_container): layer_json = super(Pooling2D, self).save(h5_container) layer_json['attributes']['type_pooling'] = self.type_pooling layer_json['attributes']['pool_size'] = self.pool_size layer_json['attributes']['stride'] = self.stride layer_json['attributes']['padding'] = self.padding layer_json['attributes']['padding_size'] = self.padding_size return layer_json def load(self, data, h5_container): super(Pooling2D, self).load(data, h5_container) self.type_pooling = data['attributes']['type_pooling'] self.pool_size = tuple(data['attributes']['pool_size']) self.stride = tuple(data['attributes']['stride']) self.padding = data['attributes']['padding'] self.padding_size = tuple(data['attributes']['padding_size']) |
Cython:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
import cython cimport cython import numpy as np cimport numpy as np from numpy.math cimport INFINITY from types cimport FLOAT64, UINT from cython.parallel import prange,parallel cimport openmp from libc.stdlib cimport abort, malloc, free # max @cython.wraparound(False) @cython.boundscheck(False) def nb_forward_max(np.ndarray[FLOAT64, ndim=4] inputv, tuple pool_size, tuple stride): cdef unsigned int pool_size0 = pool_size[0], pool_size1 = pool_size[1], \ stride0 = stride[0], stride1 = stride[1], \ num_dim = inputv.shape[3], \ batch_size = inputv.shape[0] cdef unsigned int out_size0 = (inputv.shape[1] - pool_size0) / stride0 + 1, \ out_size1 = (inputv.shape[2] - pool_size1) / stride1 + 1 cdef np.ndarray[UINT, ndim=5] mask = np.empty(shape=[batch_size, out_size0, out_size1, num_dim, 2], dtype=np.uint) cdef np.ndarray[FLOAT64, ndim=4] out = np.empty(shape=[batch_size, out_size0, out_size1, num_dim], dtype=np.float64) cdef unsigned int b, i, j, m, kw, kh, n cdef unsigned int iin, jin cdef double blockInput cdef double maxv cdef long int maxi, maxj for b in prange(batch_size, nogil=True): for i in range(out_size0): for j in range(out_size1): iin = i*stride0 jin = j*stride1 for m in range(num_dim): maxv = - INFINITY maxi = -1 maxj = -1 for kw in range(pool_size0): for kh in range(pool_size1): blockInput = inputv[b, iin + kw, jin + kh, m] if maxi == -1 or maxv < blockInput: maxv = blockInput maxi = kw maxj = kh mask[b, i, j, m, 0] = maxi mask[b, i, j, m, 1] = maxj out[b, i, j, m] = maxv return out, mask @cython.wraparound(False) @cython.boundscheck(False) def nb_derivatives_max(np.ndarray[FLOAT64, ndim=4] doutput, tuple input_size, np.ndarray[UINT, ndim=5] mask, tuple stride): cdef unsigned int out_size0 = doutput.shape[1], out_size1 = doutput.shape[2], \ stride0 = stride[0], stride1 = stride[1], \ num_dim = doutput.shape[3], \ batch_size = doutput.shape[0] cdef np.ndarray[FLOAT64, ndim=4] dx = np.zeros(shape=[batch_size, input_size[0], input_size[1], num_dim], dtype=np.float64) cdef unsigned int b, i, j, m, n cdef unsigned int idx, iin, jin cdef double blockInput cdef unsigned int iin_kw, jin_kh for b in prange(batch_size, nogil=True): for i in range(out_size0): for j in range(out_size1): iin = i*stride0 jin = j*stride1 for m in range(num_dim): iin_kw = iin + mask[b, i, j, m, 0] jin_kh = jin + mask[b, i, j, m, 1] dx[b, iin_kw, jin_kh, m] = doutput[b, i, j, m] return dx # mean @cython.wraparound(False) @cython.boundscheck(False) def nb_forward_mean(np.ndarray[FLOAT64, ndim=4] inputv, tuple pool_size, tuple stride): cdef unsigned int pool_size0 = pool_size[0], pool_size1 = pool_size[1], \ stride0 = stride[0], stride1 = stride[1], \ num_dim = inputv.shape[3], \ batch_size = inputv.shape[0] cdef unsigned int out_size0 = (inputv.shape[1] - pool_size0) / stride0 + 1, \ out_size1 = (inputv.shape[2] - pool_size1) / stride1 + 1 cdef np.ndarray[FLOAT64, ndim=4] out = np.empty(shape=[batch_size, out_size0, out_size1, num_dim], dtype=np.float64) cdef unsigned int b, i, j, m, kw, kh, n cdef unsigned int idx, iin, jin cdef double blockInput cdef double den = pool_size0*pool_size1 for b in prange(batch_size, nogil=True): for i in range(out_size0): for j in range(out_size1): iin = i*stride0 jin = j*stride1 for m in range(num_dim): out[b, i, j, m] = 0. for kw in range(pool_size0): for kh in range(pool_size1): out[b, i, j, m] += inputv[b, iin + kw, jin + kh, m] out[b, i, j, m] /= den return out @cython.wraparound(False) @cython.boundscheck(False) def nb_derivatives_mean(np.ndarray[FLOAT64, ndim=4] doutput, tuple input_size, tuple pool_size, tuple stride): cdef unsigned int pool_size0 = pool_size[0], pool_size1 = pool_size[1], \ out_size0 = doutput.shape[1], out_size1 = doutput.shape[2], \ stride0 = stride[0], stride1 = stride[1], \ num_dim = doutput.shape[3], \ batch_size = doutput.shape[0] cdef np.ndarray[FLOAT64, ndim=4] dx = np.zeros(shape=[batch_size, input_size[0], input_size[1], num_dim], dtype=np.float64) cdef unsigned int b, i, j, m, kw, kh, n cdef unsigned int idx, iin, jin cdef double blockInput cdef unsigned int iin_kw, jin_kh cdef double den = pool_size0*pool_size1 cdef double dx_p for b in prange(batch_size, nogil=True): for i in range(out_size0): for j in range(out_size1): iin = i*stride0 jin = j*stride1 for m in range(num_dim): dx_p = doutput[b, i, j, m] / den for kw in range(pool_size0): for kh in range(pool_size1): iin_kw = iin + kw jin_kh = jin + kh dx[b, iin_kw, jin_kh, m] += dx_p return dx |
Y hasta aquí ha sido todo, con esto cerramos la base de las redes convolucionales.
Espero que os haya gustado este post 😉