Advertisement

Generative Adversarial Networks for Unsupervised Cluste

阅读量:

作者:禅与计算机程序设计艺术

1.简介

许多数据科学领域都在深入探索利用机器学习(ML)方法实现高效、自动化处理和可靠的数据分析。传统聚类方法主要依赖于标注数据的标签信息,而标注数据的获取往往需要耗费大量的人力和时间成本;此外,面对海量数据时,获取足够的标签信息也面临着巨大挑战。因此,如何能够利用数据自身的分布特性进行自动生成聚类划分,成为一个当前研究的热点方向。

研究者们持续探索新的聚类模型以缓解传统方法的局限性。其中,生成式对抗网络(GANs)已被广泛认可为一种有效解决方案,通过学习生成数据的属性,能够识别出潜在的几何结构,并作为聚类任务的核心工具。

近年来,基于GANs的无监督聚类技术已经受到了越来越广泛的重视。然而,如何将GANs应用到实际问题中,仍然是一个值得深入探索的研究方向。本文主要介绍了生成对抗网络(Generative Adversarial Networks,GAN)这一变体,并探讨了其在聚类问题中的应用。

2.基本概念与术语说明

2.1 GAN概述

生成对抗网络(GAN)由两个相互竞争的神经网络构成,分别负责生成样本数据并识别真实样本。此消彼长的对抗在生成器和判别器网络之间展开,经过不断迭代最终达到动态平衡状态。由于训练过程是非监督的,因而无需人工标注数据,即可完成分类任务或生成所需图像。

GAN模型最早提出于2014年提出的ImageNet图像识别竞赛中,为图像合成领域带来了显著进展。随着GAN的出现,其在多个图像处理领域也获得了广泛关注,包括生成物体模型、风格迁移以及生成图像等多个方面。

2.2 生成器网络

在GAN模型体系中,生成器网络扮演着核心角色。该网络由编码器和解码器两个关键模块构成。其中,编码器模块负责将原始数据映射为高维特征向量,而解码器模块则负责将这些高维特征向量还原为原始数据的形式。

在GAN模型中,生成器网络可以被视为模拟目标函数,其主要目标是最大化真实数据分布与生成数据分布之间的差异。由此可知,生成器网络的输出应当呈现出与真实数据分布相似的特征。

2.3 判别器网络

判别器网络,也被称为鉴别器(discriminator),是生成对抗网络(GAN)体系中的另一个关键组成部分。它本质上是一个二元分类器,其主要职责是根据输入的图片数据,判断该图片是否属于原始数据集,或者是否为生成器生成的虚假样本。判别器网络通常由多个层构成,每一层都可以被看作是一个具有特定功能的神经元集合。在经过一系列的特征提取和判别过程后,判别器网络的最终输出是一个概率值,表示输入样本被判定为真实样本的可能性。

在训练过程中,判别器网络的主要目标是追求其在识别真实数据和生成数据之间的判别能力。具体而言,判别器网络旨在通过输出高置信的判断来识别真实数据,并通过输出低置信的判断来识别生成数据。

3.核心算法原理和具体操作步骤及数学公式讲解

3.1 数据准备

在处理聚类任务时,首先需要准备好数据集,这里假设存在N个训练样本{x(i)},其中i的取值范围为1到N,每个样本的维度为D。这些样本还具有对应的类别标签{z(i)},其中i同样是1到N。在应用GAN进行聚类时,需要对数据集进行预处理,具体包括归一化、标准化等操作。

3.2 模型搭建

3.2.1 生成器网络

生成器网络主要由编码器模块和解码器模块构成,具体包括多个卷积层、池化层以及全连接层。编码器模块的功能是将原始数据转换为高维空间中的特征表示,而解码器模块则将这些特征还原为原始数据的表达形式。

生成器旨在使其输出符合真实数据分布。因此,在损失函数设计中,将真实数据分布与生成数据分布之间的距离作为损失函数的依据。具体而言,通过交叉熵损失函数计算原始数据分布与生成数据分布之间的距离,并对计算结果进行求和取平均,随后进行反向传播。

3.2.2 判别器网络

判别器网络由多层结构构成,每一层都是一个全连接层,可以直观地理解为一堆神经元。其目标是希望其输出能够准确地反映输入数据是来自真实样本还是生成样本。判别器网络的参数是需要优化的目标,在训练过程中会动态调整更新。

在GAN中,损失函数一般为交叉熵,计算真实数据分布和生成数据分布之间的距离。如下公式:

在式子中,L(\delta_k^{(i)}, D_{\theta}(x^{i}))表示判别器网络在输入样本x^{(i)}时识别正确标签\delta_k^{(i)}的损失函数,而R(G)则衡量生成器网络生成数据的性能,当生成器的性能R(G)低于设定阈值\epsilon时,训练过程将终止。

3.2.3 总体框架

在GAN中,生成器网络生成假设的目标函数,而判别器网络则基于真实数据分布与生成数据分布之间的距离进行判别。在训练过程中,生成器网络试图生成原始数据分布无法被区分的数据,同时判别器网络通过分析生成数据的质量提供反馈信息,这种动态过程有助于提升生成器网络对数据分布真实性的逼近能力。

3.3 训练过程

3.3.1 参数初始化

首先,随机初始化参数 \thetaG

3.3.2 训练阶段

在训练阶段,需要按照下面的步骤进行:

通过系统性优化判别器网络结构,保持生成器参数不变,对判别器网络进行训练,使其在识别真实数据时具有高判别置信度,在识别生成数据时具有低判别置信度。

通过固定判别器来优化生成器网络,使其进行训练,使其生成的数据更贴近真实数据分布。

  1. 更新判别器参数:将步1的优化后的参数赋值给判别器。

  2. 更新生成器参数:将步2的优化后的参数赋值给生成器。

直到满足结束条件,如训练次数、误差下降阈值或其他终止条件。

3.4 聚类过程

GAN模型训练完成后,判别器网络将样本的概率值作为判别依据,据此对每个样本进行分类并赋予相应的类标签,从而获得聚类结果。具体而言,我们设定一个阈值τ,若样本属于生成数据,则将其归为第c类;反之,则归为第c+1类。随着τ值的逐步提升,聚类效果显著增强。

3.5 代码实现

下面演示基于TensorFlow 2.0的GAN-clustering实现。第一步,导入必要的库。

复制代码
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    import numpy as np
    
      
      
      
    
    代码解读

3.5.1 数据集准备

假设有如下数据集:

复制代码
    X =... # 训练数据
    Z =... # 对应的标签
    
      
    
    代码解读

其中,X表示各个样本的特征矩阵,Z表示样本对应的标签。为了使网络训练得以顺利进行,对数据的预处理是必要的,具体来说,我们采用以下方式进行数据预处理。

复制代码
    def preprocess(X):
    X -= np.mean(X, axis=0)
    X /= np.std(X, axis=0) + 1e-7
    return X
    
    X = preprocess(X)
    
      
      
      
      
      
    
    代码解读

3.5.2 模型搭建

搭建GAN模型需要先定义编码器(Generator)和解码器(Discriminator)。

3.5.2.1 编码器

编码器模块(Generator)是一个卷积神经网络,通过随机噪声向量进行编码,经过多层卷积、ReLU激活函数和下采样操作,能够生成与目标图像尺寸一致的像素级特征图。

复制代码
    latent_dim = 128
    
    generator_in = keras.Input(shape=(latent_dim,))
    h = layers.Dense(7*7*256)(generator_in)
    h = layers.Reshape((7, 7, 256))(h)
    h = layers.BatchNormalization()(h)
    h = layers.LeakyReLU()(h)
    h = layers.Conv2DTranspose(filters=128, kernel_size=5, strides=2, padding='same')(h)
    h = layers.BatchNormalization()(h)
    h = layers.LeakyReLU()(h)
    img = layers.Conv2DTranspose(filters=1, kernel_size=5, activation='tanh', padding='same')(h)
    generator = keras.Model(inputs=[generator_in], outputs=[img])
    
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读
3.5.2.2 解码器

解码器(Discriminator)是另一种卷积神经网络,它接收原始图像作为输入,随后通过多个卷积、ReLU激活和池化层的组合,最终生成一个概率值,用于判断输入图像是否为真实图像。

复制代码
    discriminator_in = keras.Input(shape=(None, None, 1))
    h = discriminator_in
    h = layers.Conv2D(filters=64, kernel_size=5, strides=2, padding='same')(h)
    h = layers.LeakyReLU()(h)
    h = layers.Dropout(rate=0.3)(h)
    h = layers.Conv2D(filters=128, kernel_size=5, strides=2, padding='same')(h)
    h = layers.LeakyReLU()(h)
    h = layers.Dropout(rate=0.3)(h)
    h = layers.Flatten()(h)
    out = layers.Dense(units=1, activation='sigmoid')(h)
    discriminator = keras.Model(inputs=[discriminator_in], outputs=[out])
    
      
      
      
      
      
      
      
      
      
      
    
    代码解读

3.5.3 训练过程

首先,定义损失函数。

复制代码
    cross_entropy = keras.losses.BinaryCrossentropy()
    
    def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss
    
    def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)
    
      
      
      
      
      
      
      
      
      
    
    代码解读

然后,定义优化器和训练循环。

复制代码
    adam = keras.optimizers.Adam(lr=0.0002, beta_1=0.5)
    
    @tf.function
    def train_step(batch_imgs):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        noise = tf.random.normal([batch_imgs.shape[0], latent_dim])
        generated_images = generator(noise, training=True)
    
        real_output = discriminator(batch_imgs, training=True)
        fake_output = discriminator(generated_images, training=True)
    
        d_loss = discriminator_loss(real_output, fake_output)
        g_loss = generator_loss(fake_output)
    
    gradients_of_generator = gen_tape.gradient(g_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(d_loss, discriminator.trainable_variables)
    
    adam.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    adam.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))
    
    return d_loss, g_loss
    
    for epoch in range(epochs):
    batch_num = len(X) // batch_size
    for batch_id in range(batch_num):
        img_batch = X[batch_id*batch_size:(batch_id+1)*batch_size] / 255.
        d_loss, g_loss = train_step(img_batch)
    
    print('epoch %d: loss_d %.3f, loss_g %.3f' % (epoch, d_loss, g_loss))
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

3.5.4 聚类过程

最后,基于判别器网络的输出结果,对样本进行归类,并计算各簇之间的欧氏距离,选择最小距离对应的簇标签。

复制代码
    def clustering(X):
    model = keras.models.load_model('./gan_cluster.h5')
    cluster_prob = model.predict(preprocess(X).reshape((-1,) + input_shape))[..., 0]
    Z = np.argmax(np.bincount(np.digitize(cluster_prob, bins=np.arange(0, 1, 1/n_clusters)))) - 1
    return Z
    
      
      
      
      
    
    代码解读

3.5.5 完整的代码

复制代码
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    import numpy as np
    
    input_shape = (28, 28, 1)
    latent_dim = 128
    n_clusters = 10
    epochs = 100
    batch_size = 32
    
    X =... # 训练数据
    Z =... # 对应的标签
    
    def preprocess(X):
    X -= np.mean(X, axis=0)
    X /= np.std(X, axis=0) + 1e-7
    return X
    
    generator_in = keras.Input(shape=(latent_dim,))
    h = layers.Dense(7*7*256)(generator_in)
    h = layers.Reshape((7, 7, 256))(h)
    h = layers.BatchNormalization()(h)
    h = layers.LeakyReLU()(h)
    h = layers.Conv2DTranspose(filters=128, kernel_size=5, strides=2, padding='same')(h)
    h = layers.BatchNormalization()(h)
    h = layers.LeakyReLU()(h)
    img = layers.Conv2DTranspose(filters=1, kernel_size=5, activation='tanh', padding='same')(h)
    generator = keras.Model(inputs=[generator_in], outputs=[img])
    
    discriminator_in = keras.Input(shape=(None, None, 1))
    h = discriminator_in
    h = layers.Conv2D(filters=64, kernel_size=5, strides=2, padding='same')(h)
    h = layers.LeakyReLU()(h)
    h = layers.Dropout(rate=0.3)(h)
    h = layers.Conv2D(filters=128, kernel_size=5, strides=2, padding='same')(h)
    h = layers.LeakyReLU()(h)
    h = layers.Dropout(rate=0.3)(h)
    h = layers.Flatten()(h)
    out = layers.Dense(units=1, activation='sigmoid')(h)
    discriminator = keras.Model(inputs=[discriminator_in], outputs=[out])
    
    cross_entropy = keras.losses.BinaryCrossentropy()
    
    def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss
    
    def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)
    
    adam = keras.optimizers.Adam(lr=0.0002, beta_1=0.5)
    
    @tf.function
    def train_step(batch_imgs):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        noise = tf.random.normal([batch_imgs.shape[0], latent_dim])
        generated_images = generator(noise, training=True)
    
        real_output = discriminator(batch_imgs, training=True)
        fake_output = discriminator(generated_images, training=True)
    
        d_loss = discriminator_loss(real_output, fake_output)
        g_loss = generator_loss(fake_output)
    
    gradients_of_generator = gen_tape.gradient(g_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(d_loss, discriminator.trainable_variables)
    
    adam.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    adam.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))
    
    return d_loss, g_loss
    
    
    # 训练过程
    for epoch in range(epochs):
    batch_num = len(X) // batch_size
    for batch_id in range(batch_num):
        img_batch = X[batch_id*batch_size:(batch_id+1)*batch_size].reshape((-1,) + input_shape) / 255.
        d_loss, g_loss = train_step(img_batch)
    
    print('epoch %d: loss_d %.3f, loss_g %.3f' % (epoch, d_loss, g_loss))
    
    # 保存模型
    generator.save('gan_generator.h5')
    discriminator.save('gan_discriminator.h5')
    
    # 加载模型
    model = keras.Sequential([generator, discriminator])
    model.compile(optimizer='adam', loss=['binary_crossentropy'])
    
    # 对测试集进行聚类
    test_data =... # 测试数据
    Z_pred = clustering(test_data)
    print('accuracy:', np.mean(Z == Z_pred))
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

全部评论 (0)

还没有任何评论哟~