-
-
Save gabrieleangeletti/db85fab069bd2d6fb3e7 to your computer and use it in GitHub Desktop.
| import tensorflow as tf | |
| import numpy as np | |
| import os | |
| import zconfig | |
| import utils | |
| class RBM(object): | |
| """ Restricted Boltzmann Machine implementation using TensorFlow. | |
| The interface of the class is sklearn-like. | |
| """ | |
| def __init__(self, num_visible, num_hidden, visible_unit_type='bin', main_dir='rbm', model_name='rbm_model', | |
| gibbs_sampling_steps=1, learning_rate=0.01, batch_size=10, num_epochs=10, stddev=0.1, verbose=0): | |
| """ | |
| :param num_visible: number of visible units | |
| :param num_hidden: number of hidden units | |
| :param visible_unit_type: type of the visible units (binary or gaussian) | |
| :param main_dir: main directory to put the models, data and summary directories | |
| :param model_name: name of the model, used to save data | |
| :param gibbs_sampling_steps: optional, default 1 | |
| :param learning_rate: optional, default 0.01 | |
| :param batch_size: optional, default 10 | |
| :param num_epochs: optional, default 10 | |
| :param stddev: optional, default 0.1. Ignored if visible_unit_type is not 'gauss' | |
| :param verbose: level of verbosity. optional, default 0 | |
| """ | |
| self.num_visible = num_visible | |
| self.num_hidden = num_hidden | |
| self.visible_unit_type = visible_unit_type | |
| self.main_dir = main_dir | |
| self.model_name = model_name | |
| self.gibbs_sampling_steps = gibbs_sampling_steps | |
| self.learning_rate = learning_rate | |
| self.batch_size = batch_size | |
| self.num_epochs = num_epochs | |
| self.stddev = stddev | |
| self.verbose = verbose | |
| self.models_dir, self.data_dir, self.summary_dir = self._create_data_directories() | |
| self.model_path = self.models_dir + self.model_name | |
| self.W = None | |
| self.bh_ = None | |
| self.bv_ = None | |
| self.w_upd8 = None | |
| self.bh_upd8 = None | |
| self.bv_upd8 = None | |
| self.encode = None | |
| self.loss_function = None | |
| self.input_data = None | |
| self.hrand = None | |
| self.vrand = None | |
| self.validation_size = None | |
| self.tf_merged_summaries = None | |
| self.tf_summary_writer = None | |
| self.tf_session = None | |
| self.tf_saver = None | |
| def fit(self, train_set, validation_set=None, restore_previous_model=False): | |
| """ Fit the model to the training data. | |
| :param train_set: training set | |
| :param validation_set: validation set. optional, default None | |
| :param restore_previous_model: | |
| if true, a previous trained model | |
| with the same name of this model is restored from disk to continue training. | |
| :return: self | |
| """ | |
| if validation_set is not None: | |
| self.validation_size = validation_set.shape[0] | |
| self._build_model() | |
| with tf.Session() as self.tf_session: | |
| self._initialize_tf_utilities_and_ops(restore_previous_model) | |
| self._train_model(train_set, validation_set) | |
| self.tf_saver.save(self.tf_session, self.model_path) | |
| def _initialize_tf_utilities_and_ops(self, restore_previous_model): | |
| """ Initialize TensorFlow operations: summaries, init operations, saver, summary_writer. | |
| Restore a previously trained model if the flag restore_previous_model is true. | |
| """ | |
| self.tf_merged_summaries = tf.merge_all_summaries() | |
| init_op = tf.initialize_all_variables() | |
| self.tf_saver = tf.train.Saver() | |
| self.tf_session.run(init_op) | |
| if restore_previous_model: | |
| self.tf_saver.restore(self.tf_session, self.model_path) | |
| self.tf_summary_writer = tf.train.SummaryWriter(self.summary_dir, self.tf_session.graph_def) | |
| def _train_model(self, train_set, validation_set): | |
| """ Train the model. | |
| :param train_set: training set | |
| :param validation_set: validation set. optional, default None | |
| :return: self | |
| """ | |
| for i in range(self.num_epochs): | |
| self._run_train_step(train_set) | |
| if validation_set is not None: | |
| self._run_validation_error_and_summaries(i, validation_set) | |
| def _run_train_step(self, train_set): | |
| """ Run a training step. A training step is made by randomly shuffling the training set, | |
| divide into batches and run the variable update nodes for each batch. | |
| :param train_set: training set | |
| :return: self | |
| """ | |
| np.random.shuffle(train_set) | |
| batches = [_ for _ in utils.gen_batches(train_set, self.batch_size)] | |
| updates = [self.w_upd8, self.bh_upd8, self.bv_upd8] | |
| for batch in batches: | |
| self.tf_session.run(updates, feed_dict=self._create_feed_dict(batch)) | |
| def _run_validation_error_and_summaries(self, epoch, validation_set): | |
| """ Run the summaries and error computation on the validation set. | |
| :param epoch: current epoch | |
| :param validation_set: validation data | |
| :return: self | |
| """ | |
| result = self.tf_session.run([self.tf_merged_summaries, self.loss_function], | |
| feed_dict=self._create_feed_dict(validation_set)) | |
| summary_str = result[0] | |
| err = result[1] | |
| self.tf_summary_writer.add_summary(summary_str, 1) | |
| if self.verbose == 1: | |
| print("Validation cost at step %s: %s" % (epoch, err)) | |
| def _create_feed_dict(self, data): | |
| """ Create the dictionary of data to feed to TensorFlow's session during training. | |
| :param data: training/validation set batch | |
| :return: dictionary(self.input_data: data, self.hrand: random_uniform, self.vrand: random_uniform) | |
| """ | |
| return { | |
| self.input_data: data, | |
| self.hrand: np.random.rand(data.shape[0], self.num_hidden), | |
| self.vrand: np.random.rand(data.shape[0], self.num_visible) | |
| } | |
| def _build_model(self): | |
| """ Build the Restricted Boltzmann Machine model in TensorFlow. | |
| :return: self | |
| """ | |
| self.input_data, self.hrand, self.vrand = self._create_placeholders() | |
| self.W, self.bh_, self.bv_ = self._create_variables() | |
| hprobs0, hstates0, vprobs, hprobs1, hstates1 = self.gibbs_sampling_step(self.input_data) | |
| positive = self.compute_positive_association(self.input_data, hprobs0, hstates0) | |
| nn_input = vprobs | |
| for step in range(self.gibbs_sampling_steps - 1): | |
| hprobs, hstates, vprobs, hprobs1, hstates1 = self.gibbs_sampling_step(nn_input) | |
| nn_input = vprobs | |
| negative = tf.matmul(tf.transpose(vprobs), hprobs1) | |
| self.encode = hprobs1 # encoded data, used by the transform method | |
| self.w_upd8 = self.W.assign_add(self.learning_rate * (positive - negative)) | |
| self.bh_upd8 = self.bh_.assign_add(self.learning_rate * tf.reduce_mean(hprobs0 - hprobs1, 0)) | |
| self.bv_upd8 = self.bv_.assign_add(self.learning_rate * tf.reduce_mean(self.input_data - vprobs, 0)) | |
| self.loss_function = tf.sqrt(tf.reduce_mean(tf.square(self.input_data - vprobs))) | |
| _ = tf.scalar_summary("cost", self.loss_function) | |
| def _create_placeholders(self): | |
| """ Create the TensorFlow placeholders for the model. | |
| :return: tuple(input(shape(None, num_visible)), | |
| hrand(shape(None, num_hidden)) | |
| vrand(shape(None, num_visible))) | |
| """ | |
| x = tf.placeholder('float', [None, self.num_visible], name='x-input') | |
| hrand = tf.placeholder('float', [None, self.num_hidden], name='hrand') | |
| vrand = tf.placeholder('float', [None, self.num_visible], name='vrand') | |
| return x, hrand, vrand | |
| def _create_variables(self): | |
| """ Create the TensorFlow variables for the model. | |
| :return: tuple(weights(shape(num_visible, num_hidden), | |
| hidden bias(shape(num_hidden)), | |
| visible bias(shape(num_visible))) | |
| """ | |
| W = tf.Variable(tf.random_normal((self.num_visible, self.num_hidden), mean=0.0, stddev=0.01), name='weights') | |
| bh_ = tf.Variable(tf.zeros([self.num_hidden]), name='hidden-bias') | |
| bv_ = tf.Variable(tf.zeros([self.num_visible]), name='visible-bias') | |
| return W, bh_, bv_ | |
| def gibbs_sampling_step(self, visible): | |
| """ Performs one step of gibbs sampling. | |
| :param visible: activations of the visible units | |
| :return: tuple(hidden probs, hidden states, visible probs, | |
| new hidden probs, new hidden states) | |
| """ | |
| hprobs, hstates = self.sample_hidden_from_visible(visible) | |
| vprobs = self.sample_visible_from_hidden(hprobs) | |
| hprobs1, hstates1 = self.sample_hidden_from_visible(vprobs) | |
| return hprobs, hstates, vprobs, hprobs1, hstates1 | |
| def sample_hidden_from_visible(self, visible): | |
| """ Sample the hidden units from the visible units. | |
| This is the Positive phase of the Contrastive Divergence algorithm. | |
| :param visible: activations of the visible units | |
| :return: tuple(hidden probabilities, hidden binary states) | |
| """ | |
| hprobs = tf.nn.sigmoid(tf.matmul(visible, self.W) + self.bh_) | |
| hstates = utils.sample_prob(hprobs, self.hrand) | |
| return hprobs, hstates | |
| def sample_visible_from_hidden(self, hidden): | |
| """ Sample the visible units from the hidden units. | |
| This is the Negative phase of the Contrastive Divergence algorithm. | |
| :param hidden: activations of the hidden units | |
| :return: visible probabilities | |
| """ | |
| visible_activation = tf.matmul(hidden, tf.transpose(self.W)) + self.bv_ | |
| if self.visible_unit_type == 'bin': | |
| vprobs = tf.nn.sigmoid(visible_activation) | |
| elif self.visible_unit_type == 'gauss': | |
| vprobs = tf.truncated_normal((1, self.num_visible), mean=visible_activation, stddev=self.stddev) | |
| else: | |
| vprobs = None | |
| return vprobs | |
| def compute_positive_association(self, visible, hidden_probs, hidden_states): | |
| """ Compute positive associations between visible and hidden units. | |
| :param visible: visible units | |
| :param hidden_probs: hidden units probabilities | |
| :param hidden_states: hidden units states | |
| :return: positive association = dot(visible.T, hidden) | |
| """ | |
| if self.visible_unit_type == 'bin': | |
| positive = tf.matmul(tf.transpose(visible), hidden_states) | |
| elif self.visible_unit_type == 'gauss': | |
| positive = tf.matmul(tf.transpose(visible), hidden_probs) | |
| else: | |
| positive = None | |
| return positive | |
| def _create_data_directories(self): | |
| """ Create the three directories for storing respectively the models, | |
| the data generated by training and the TensorFlow's summaries. | |
| :return: tuple of strings(models_dir, data_dir, summary_dir) | |
| """ | |
| self.main_dir = self.main_dir + '/' if self.main_dir[-1] != '/' else self.main_dir | |
| models_dir = config.models_dir + self.main_dir | |
| data_dir = config.data_dir + self.main_dir | |
| summary_dir = config.summary_dir + self.main_dir | |
| for d in [models_dir, data_dir, summary_dir]: | |
| if not os.path.isdir(d): | |
| os.mkdir(d) | |
| return models_dir, data_dir, summary_dir | |
| def transform(self, data, name='train', save=False): | |
| """ Transform data according to the model. | |
| :type data: array_like | |
| :param data: Data to transform | |
| :type name: string, default 'train' | |
| :param name: Identifier for the data that is being encoded | |
| :type save: boolean, default 'False' | |
| :param save: If true, save data to disk | |
| :return: transformed data | |
| """ | |
| with tf.Session() as self.tf_session: | |
| self.tf_saver.restore(self.tf_session, self.model_path) | |
| encoded_data = self.encode.eval(self._create_feed_dict(data)) | |
| if save: | |
| np.save(self.data_dir + self.model_name + '-' + name, encoded_data) | |
| return encoded_data | |
| def load_model(self, shape, gibbs_sampling_steps, model_path): | |
| """ Load a trained model from disk. The shape of the model | |
| (num_visible, num_hidden) and the number of gibbs sampling steps | |
| must be known in order to restore the model. | |
| :param shape: tuple(num_visible, num_hidden) | |
| :param gibbs_sampling_steps: | |
| :param model_path: | |
| :return: self | |
| """ | |
| self.num_visible, self.num_hidden = shape[0], shape[1] | |
| self.gibbs_sampling_steps = gibbs_sampling_steps | |
| self._build_model() | |
| init_op = tf.initialize_all_variables() | |
| self.tf_saver = tf.train.Saver() | |
| with tf.Session() as self.tf_session: | |
| self.tf_session.run(init_op) | |
| self.tf_saver.restore(self.tf_session, model_path) | |
| def get_model_parameters(self): | |
| """ Return the model parameters in the form of numpy arrays. | |
| :return: model parameters | |
| """ | |
| with tf.Session() as self.tf_session: | |
| self.tf_saver.restore(self.tf_session, self.model_path) | |
| return { | |
| 'W': self.W.eval(), | |
| 'bh_': self.bh_.eval(), | |
| 'bv_': self.bv_.eval() | |
| } | |
| def get_weights_as_images(self, width, height, outdir='img/', n_images=10, img_type='grey'): | |
| """ Create and save the weights of the hidden units with respect to the | |
| visible units as images. | |
| :param width: | |
| :param height: | |
| :param outdir: | |
| :param n_images: | |
| :param img_type: | |
| :return: self | |
| """ | |
| outdir = self.data_dir + outdir | |
| with tf.Session() as self.tf_session: | |
| self.tf_saver.restore(self.tf_session, self.model_path) | |
| weights = self.W.eval() | |
| perm = np.random.permutation(self.num_hidden)[:n_images] | |
| for p in perm: | |
| w = np.array([i[p] for i in weights]) | |
| image_path = outdir + self.model_name + '_{}.png'.format(p) | |
| utils.gen_image(w, width, height, image_path, img_type) |
| from tensorflow.python.framework import ops | |
| import tensorflow as tf | |
| import numpy as np | |
| import os | |
| import zconfig | |
| import utils | |
| class RBM(object): | |
| """ Restricted Boltzmann Machine implementation using TensorFlow. | |
| The interface of the class is sklearn-like. | |
| """ | |
| def __init__(self, nvis, nhid, vis_type='bin', directory_name='rbm', model_name='', gibbs_k=1, learning_rate=0.01, | |
| batch_size=10, n_iter=10, stddev=0.1, verbose=0): | |
| self.nvis = nvis | |
| self.nhid = nhid | |
| self.vis_type = vis_type | |
| self.directory_name = directory_name | |
| self.model_name = model_name | |
| self.gibbs_k = gibbs_k | |
| self.learning_rate = learning_rate | |
| self.batch_size = batch_size | |
| self.n_iter = n_iter | |
| self.stddev = stddev | |
| self.verbose = verbose | |
| # Directories paths | |
| self.directory_name = self.directory_name + '/' if self.directory_name[-1] != '/' else self.directory_name | |
| self.models_dir = config.models_dir + self.directory_name | |
| self.data_dir = config.data_dir + self.directory_name | |
| self.summary_dir = config.summary_dir + self.directory_name | |
| # Create dirs | |
| for d in [self.models_dir, self.data_dir, self.summary_dir]: | |
| if not os.path.isdir(d): | |
| os.mkdir(d) | |
| if self.model_name == '': | |
| # Assign model complete name | |
| self.model_name = 'rbm-{}-{}-{}-{}-{}-{}'.format( | |
| self.nvis, self.nhid, self.n_iter, self.batch_size, self.learning_rate, self.batch_size) | |
| # ############################# # | |
| # Computational graph nodes # | |
| # ############################# # | |
| # Model parameters | |
| self.W = None | |
| self.bh_ = None | |
| self.bv_ = None | |
| self.w_upd8 = None | |
| self.bh_upd8 = None | |
| self.bv_upd8 = None | |
| self.encode = None | |
| self.cost = None | |
| self.hrand = None | |
| self.vrand = None | |
| self.validation_size = None | |
| self.sess = None | |
| self.saver = None | |
| def _create_graph(self): | |
| # Symbolic variables | |
| self.x = tf.placeholder('float', [None, self.nvis], name='x-input') | |
| self.hrand = tf.placeholder('float', [None, self.nhid], name='hrand') | |
| self.vrand = tf.placeholder('float', [None, self.nvis], name='vrand-train') | |
| # Biases | |
| self.bh_ = tf.Variable(tf.zeros([self.nhid]), name='hidden-bias') | |
| self.bv_ = tf.Variable(tf.zeros([self.nvis]), name='visible-bias') | |
| self.W = tf.Variable(tf.random_normal((self.nvis, self.nhid), mean=0.0, stddev=0.01), name='weights') | |
| nn_input = self.x | |
| # Initialization | |
| hprobs0 = None | |
| hprobs = None | |
| positive = None | |
| vprobs = None | |
| hprobs1 = None | |
| hstates1 = None | |
| for step in range(self.gibbs_k): | |
| # Positive Contrastive Divergence phase | |
| hprobs = tf.nn.sigmoid(tf.matmul(nn_input, self.W) + self.bh_) | |
| hstates = utils.sample_prob(hprobs, self.hrand) | |
| # Compute positive associations in step 0 | |
| if step == 0: | |
| hprobs0 = hprobs # save the activation probabilities of the first step | |
| if self.vis_type == 'bin': | |
| positive = tf.matmul(tf.transpose(nn_input), hstates) | |
| elif self.vis_type == 'gauss': | |
| positive = tf.matmul(tf.transpose(nn_input), hprobs) | |
| # Negative Contrastive Divergence phase | |
| visible_activation = tf.matmul(hprobs, tf.transpose(self.W)) + self.bv_ | |
| if self.vis_type == 'bin': | |
| vprobs = tf.nn.sigmoid(visible_activation) | |
| elif self.vis_type == 'gauss': | |
| vprobs = tf.truncated_normal((1, self.nvis), mean=visible_activation, stddev=self.stddev) | |
| # Sample again from the hidden units | |
| hprobs1 = tf.nn.sigmoid(tf.matmul(vprobs, self.W) + self.bh_) | |
| hstates1 = utils.sample_prob(hprobs1, self.hrand) | |
| # Use the reconstructed visible units as input for the next step | |
| nn_input = vprobs | |
| negative = tf.matmul(tf.transpose(vprobs), hprobs1) | |
| self.encode = hprobs # encoded data | |
| self.w_upd8 = self.W.assign_add(self.learning_rate * (positive - negative)) | |
| self.bh_upd8 = self.bh_.assign_add(self.learning_rate * tf.reduce_mean(hprobs0 - hprobs1, 0)) | |
| self.bv_upd8 = self.bv_.assign_add(self.learning_rate * tf.reduce_mean(self.x - vprobs, 0)) | |
| self.cost = tf.sqrt(tf.reduce_mean(tf.square(self.x - vprobs))) | |
| _ = tf.scalar_summary("cost", self.cost) | |
| def fit(self, trX, vlX=None, restore_previous_model=False): | |
| if vlX is not None: | |
| self.validation_size = vlX.shape[0] | |
| # Reset tensorflow's default graph | |
| ops.reset_default_graph() | |
| self._create_graph() | |
| merged = tf.merge_all_summaries() | |
| init_op = tf.initialize_all_variables() | |
| self.saver = tf.train.Saver() | |
| with tf.Session() as self.sess: | |
| self.sess.run(init_op) | |
| if restore_previous_model: | |
| # Restore previous model | |
| self.saver.restore(self.sess, self.models_dir + self.model_name) | |
| # Change model name | |
| self.model_name += '-restored{}'.format(self.n_iter) | |
| # Write tensorflow summaries to summary dir | |
| writer = tf.train.SummaryWriter(self.summary_dir, self.sess.graph_def) | |
| for i in range(self.n_iter): | |
| # Randomly shuffle the input | |
| np.random.shuffle(trX) | |
| batches = [_ for _ in utils.gen_batches(trX, self.batch_size)] | |
| for batch in batches: | |
| self.sess.run([self.w_upd8, self.bh_upd8, self.bv_upd8], | |
| feed_dict={self.x: batch, | |
| self.hrand: np.random.rand(batch.shape[0], self.nhid), | |
| self.vrand: np.random.rand(batch.shape[0], self.nvis)}) | |
| if i % 5 == 0: | |
| # Record summary data | |
| if vlX is not None: | |
| feed = {self.x: vlX, | |
| self.hrand: np.random.rand(self.validation_size, self.nhid), | |
| self.vrand: np.random.rand(self.validation_size, self.nvis)} | |
| result = self.sess.run([merged, self.cost], feed_dict=feed) | |
| summary_str = result[0] | |
| err = result[1] | |
| writer.add_summary(summary_str, 1) | |
| if self.verbose == 1: | |
| print("Validation cost at step %s: %s" % (i, err)) | |
| # Save trained model | |
| self.saver.save(self.sess, self.models_dir + self.model_name) | |
| def transform(self, data, name='train', gibbs_k=1, save=False, models_dir=''): | |
| """ Transform data according to the model. | |
| :type data: array_like | |
| :param data: Data to transform | |
| :type name: string, default 'train' | |
| :param name: Identifier for the data that is being encoded | |
| :type gibbs_k: 1 | |
| :param gibbs_k: Gibbs sampling steps | |
| :type save: boolean, default 'False' | |
| :param save: If true, save data to disk | |
| :return: transformed data | |
| """ | |
| with tf.Session() as self.sess: | |
| # Restore trained model | |
| self.saver.restore(self.sess, self.models_dir + self.model_name) | |
| # Return the output of the encoding layer | |
| encoded_data = self.encode.eval({self.x: data, | |
| self.hrand: np.random.rand(data.shape[0], self.nhid), | |
| self.vrand: np.random.rand(data.shape[0], self.nvis)}) | |
| if save: | |
| # Save transformation to output file | |
| np.save(self.data_dir + self.model_name + '-' + name, encoded_data) | |
| return encoded_data | |
| def load_model(self, shape, gibbs_k, model_path): | |
| """ | |
| :param shape: tuple(nvis, nhid) | |
| :param model_path: | |
| :return: | |
| """ | |
| self.nvis, self.nhid = shape[0], shape[1] | |
| self.gibbs_k = gibbs_k | |
| self._create_graph() | |
| # Initialize variables | |
| init_op = tf.initialize_all_variables() | |
| # Add ops to save and restore all the variables | |
| self.saver = tf.train.Saver() | |
| with tf.Session() as self.sess: | |
| self.sess.run(init_op) | |
| # Restore previous model | |
| self.saver.restore(self.sess, model_path) | |
| def get_model_parameters(self): | |
| """ Return the model parameters in the form of numpy arrays. | |
| :return: model parameters | |
| """ | |
| with tf.Session() as self.sess: | |
| # Restore trained model | |
| self.saver.restore(self.sess, self.models_dir + self.model_name) | |
| return { | |
| 'W': self.W.eval(), | |
| 'bh_': self.bh_.eval(), | |
| 'bv_': self.bv_.eval() | |
| } | |
| def get_weights_as_images(self, width, height, outdir='img/', n_images=10, img_type='grey'): | |
| outdir = self.data_dir + outdir | |
| with tf.Session() as self.sess: | |
| self.saver.restore(self.sess, self.models_dir + self.model_name) | |
| weights = self.W.eval() | |
| perm = np.random.permutation(self.nhid)[:n_images] | |
| for p in perm: | |
| w = np.array([i[p] for i in weights]) | |
| image_path = outdir + self.model_name + '_{}.png'.format(p) | |
| utils.gen_image(w, width, height, image_path, img_type) |
| from scipy import misc | |
| import tensorflow as tf | |
| import numpy as np | |
| def sample_prob(probs, rand): | |
| """ Takes a tensor of probabilities (as from a sigmoidal activation) | |
| and samples from all the distributions | |
| :param probs: tensor of probabilities | |
| :param rand: tensor (of the same shape as probs) of random values | |
| :return : binary sample of probabilities | |
| """ | |
| return tf.nn.relu(tf.sign(probs - rand)) | |
| def gen_batches(data, batch_size): | |
| """ Divide input data into batches. | |
| :param data: input data | |
| :param batch_size: size of each batch | |
| :return: data divided into batches | |
| """ | |
| data = np.array(data) | |
| for i in range(0, data.shape[0], batch_size): | |
| yield data[i:i+batch_size] | |
| def gen_image(img, width, height, outfile, img_type='grey'): | |
| assert len(img) == width * height or len(img) == width * height * 3 | |
| if img_type == 'grey': | |
| misc.imsave(outfile, img.reshape(width, height)) | |
| elif img_type == 'color': | |
| misc.imsave(outfile, img.reshape(3, width, height)) |
| models_dir = 'models/' # dir to save/restore models | |
| data_dir = 'data/' # directory to store algorithm data | |
| summary_dir = 'logs/' # directory to store tensorflow summaries |
One question: During your batch-wise _run_train_step , if one wants to get the self.encode variable for every input (not batch wise), how can one do that?
@blackecho can you provide a sample code on how to use it ?
I think something like this should work :)
from RBM import RBM
from tensorflow.examples.tutorials.mnist import input_data
import tensorflow as tf
#Loading in the mnist data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)
trX, trY, teX, teY = mnist.train.images, mnist.train.labels, mnist.test.images, mnist.test.labels
RBM_visible_sizes = 784
RBM_hidden_sizes = 600
#Since we are training, set input as training data
inpX = trX
rbm = RBM(RBM_visible_sizes, RBM_hidden_sizes))
rbm.fit(inpX)
I think there is an error in the method _create_data_directories
The correct paths should be:
models_dir = self.main_dir + zconfig.models_dir
data_dir = self.main_dir + zconfig.data_dir
summary_dir = self.main_dir + zconfig.summary_dir
Hello, Thanks for this implementation. One question, why are you using the reduce_mean function when updating the biases on line 204 and 205 in your first example? self.bh_upd8 = self.bh_.assign_add(self.learning_rate * tf.reduce_mean(hprobs0 - hprobs1, 0)) for example, shouldn't it be self.bh_upd8 = self.bh_.assign_add(self.learning_rate * (hprobs0 - hprobs1))? I base this question on the very beginning of this video (look at his formulas for the bias updates) https://www.youtube.com/watch?v=S0kFFiHzR8M .
Hi,
Could anyone please share a sample code that uses this class to generate an RBM?