cjuexuan / mynote

237 stars 34 forks source link

hdfs file md5 计算,实现本地与hdfs同步文件 #9

Open cjuexuan opened 8 years ago

cjuexuan commented 8 years ago

需求

本地有文件,hdfs也有文件,如果是同一个文件,则不同步,否则就同步文件 如果本地有的,hdfs无,则上传 如果本地无得,hdfs有,则删除

思考

计算文件相同,则计算md5值

如何算

本来想用hdfs的checksum,但那个是crc,每次写block会去算一下,最后是一组checksum,而本地文件系统默认不会计算这个值 local 后面就直接用流计算的:


  def getHdfsFileMd5(path: Path, configuration: Configuration): String = {
    val dfs = FileSystem.get(configuration)
    val in = dfs.open(path)
    Try {
      DigestUtils.md5Hex(in)
    } match {
      case Success(s) ⇒ in.close(); dfs.close(); s
      case Failure(e) ⇒ in.close(); dfs.close(); e.getMessage
    }
  }

  def getLocalFileMd5(file: File): String = {
    val in = new FileInputStream(file)
    Try {
      DigestUtils.md5Hex(in)
    } match {
      case Success(s) ⇒ in.close(); s
      case Failure(e) ⇒ in.close(); e.getMessage
    }

  }

设计

按照刚才的思路,可以分成下面几种情况

本地 HDFS 是否相同
文件 文件 相同
文件 文件 不同
文件 文件夹 无需比较
文件夹 文件 无需比较
文件夹 文件夹 无需比较

所以设置一个type:

trait Mode

object Mode{
  type isSameFile = (Boolean,Boolean,Boolean)
}

而在模式匹配中也是这样去使用的


  def syncHelper(localFile:File,hdfsPath:Path,configuration: Configuration) :Unit = {
    val fileSystem = FileSystem.get(configuration)
    val mode:Mode.isSameFile = (localFile.isFile,fileSystem.isFile(hdfsPath),
      sameMd5(localFile,hdfsPath,fileSystem))
    mode match {
      case(true,true,true) ⇒ logger.info(s"the file :${localFile.getName} in local and hdfs are same one")
      case (true,true,false) ⇒
        logger.debug(s"the file: ${localFile.getName} in local and hdfs have same name,but they are different file")
//         copyFromLocal to hdfs by overwrite
        val fileSystem = FileSystem.get(configuration)
        fileSystem.copyFromLocalFile(false,true,new Path(localFile.getAbsolutePath),hdfsPath)
      case(true,false,_) ⇒
        logger.debug(s"the file: ${localFile.getName} in local is file and in hdfs is dir")
        //first delete file in hdfs.then copyFromLocal to hdfs
        fileSystem.delete(hdfsPath,true)
        fileSystem.copyFromLocalFile(false,true,new Path(localFile.getAbsolutePath),hdfsPath)
      case (false,true,_) ⇒
        val fileSystem = FileSystem.get(configuration)
        logger.debug(s"in local this is a dir and in hdfs is a file,upload ${localFile.getName} ")
        //first delete file in hdfs ,then copyFromLocal to hdfs
        composeAction(localFile,hdfsPath,fileSystem)
      case (false,false,_) ⇒
        logger.debug(s"both local and hdfs this is dir:${localFile.getName}")
        //three list ,which need update ,which need delete ,which need update
        composeAction(localFile,hdfsPath,fileSystem)
        val childrenDir = localFile.listFiles().filter(_.isDirectory)
        val hdfsParent = hdfsPath.toString
        childrenDir.foreach(file ⇒ syncHelper(file,new Path(s"$hdfsParent/${file.getName}"),configuration))

    }
  }

辅助函数

那些需要被删除的:

  def needDelete(localFile: File, hdfsPath: Path, configuration: Configuration) = {
    FileSystem.get(configuration).listStatus(hdfsPath)
 .map(_.getPath.getName).diff(localFile.listFiles().map(_.getName)).toList
  }

那些需要被更新:

  def needUpdate(localFile: File, hdfsPath: Path, configuration: Configuration) = {
    val tmpFile = FileSystem.get(configuration).listStatus(hdfsPath).map(_.getPath.getName)
      .intersect(localFile.listFiles().map(_.getName))

    val localMd5 = localFile.listFiles().filter(_.isFile)
      .filter(file ⇒ tmpFile.contains(file.getName))
      .map(file ⇒ (file.getName, getLocalFileMd5(file)))
    val fileSystem = FileSystem.get(configuration)
    val hdfsMd5 = FileSystem.get(configuration).listStatus(hdfsPath)
        .filter(path ⇒ fileSystem.isFile(path.getPath))
      .filter(path ⇒ tmpFile.contains(path.getPath.getName))
      .map(path ⇒ (path.getPath.getName, getHdfsFileMd5(path.getPath, configuration)))
    localMd5.diff(hdfsMd5).map(_._1).toList
  }

那些需要被上传的:


  def needUpload(localFile: File, hdfsPath: Path, configuration: Configuration) = {
    localFile.listFiles().map(_.getName).diff(
      FileSystem.get(configuration).listStatus(hdfsPath).map(_.getPath.getName)).toList
  }

组合函数:

  def composeAction(localFile:File,hdfsPath:Path,fileSystem: FileSystem) = {
    val configuration = fileSystem.getConf
    val deleteList = needDelete(localFile,hdfsPath,configuration)
    val uploadList = needUpload(localFile,hdfsPath,configuration)
    val updateList = needUpdate(localFile,hdfsPath,configuration)
    val concatList = uploadList ++ updateList
    val localParent = localFile.getAbsolutePath
    val hdfsParent = hdfsPath.toString
    logger.debug("deleting which file need delete")
    val deleteFileSystem = FileSystem.get(configuration)
    deleteList.foreach(name ⇒ deleteFileSystem.delete(new Path(s"$hdfsParent/$name"),true))
    logger.debug("deleted")
    logger.debug("uploading which file need upload or update")
    val concatFileSystem = FileSystem.get(configuration)
    concatList.foreach(name ⇒ concatFileSystem.copyFromLocalFile(false,true,
      new Path(s"$localParent/$name"),new Path(s"$hdfsParent/$name")))
    logger.debug("uploaded")
  }

测试

package

package object sync {
  lazy val sync = PathSyncer
  lazy val fileMd5Spec = new File(new File("src/test/scala/com/ximalaya/data/sync/Md5Spec.scala").getAbsolutePath)
  lazy val pathHDFS = new Path("/tmp/todd/a")
  lazy val pathLcoal = new File("/Users/cjuexuan/data/testfs/a")
  lazy val resources = Seq("/Users/cjuexuan/conf/hadoop/hadoop/core-site.xml",
    "/Users/cjuexuan/conf/hadoop/hadoop/hdfs-site.xml")

  implicit def getHadoopConf(resources: Seq[String]): Configuration = {
    resources.foldLeft(new Configuration()) {
      case (conf, path) ⇒ conf.addResource(new Path(path))
        conf
    }
  }
}

pathSpec:


import java.io.File

import org.apache.hadoop.fs.Path
import org.scalatest.{FlatSpec, Matchers}

/**
  * Created by todd.chen on 16/3/15.
  * email : todd.chen@ximalaya.com
  */
class PathSpec extends FlatSpec with Matchers{

  val path = new Path("/tmp/todd")
  val file = new File("/Users/cjuexuan/data/testfs")
  "File in hdfs and not in local" should  "delete" in {
    sync.needDelete(file,path,resources).length should be (1)
    sync.needDelete(file,path,resources).head should be ("user_info")
  }

  "File in local and not in hdfs" should "update" in {
    sync.needUpload(file,path,resources).length should be (1)
    sync.needUpload(file,path,resources).head should be ("b")
  }

  "File in local and hdfs have diff md5" should "update" in {
    sync.needUpdate(file,path,resources).length should be (1)
    sync.needUpdate(file,path,resources).head should be ("c")
  }

}

md5Spec:

import org.scalatest.{FlatSpec, Matchers}

import scala.com.ximalaya.data.sync._

/**
  * Created by todd.chen on 16/3/14.
  * email : todd.chen@ximalaya.com
  */
class Md5Spec  extends FlatSpec with Matchers{

  "file md5Spec" should "get md5 with String type" in {
    assert(sync.getLocalFileMd5(fileMd5Spec).isInstanceOf[String])
  }

  "file md5Spec's md5 " should "format to int type with hex" in{
    assert(BigInt(sync.getLocalFileMd5(fileMd5Spec),16).isInstanceOf[BigInt])
  }

 "hdfs path" should "get md5 with String type " in {
   assert(sync.getHdfsFileMd5(pathHDFS,getHadoopConf(resources)).isInstanceOf[String])
 }

  "Same file in hdfs and lcoalSystem" should "have same md5" in{
    val localMd5 = sync.getLocalFileMd5(pathLcoal)
    val hdfsMd5 = sync.getHdfsFileMd5(pathHDFS,getHadoopConf(resources))
    localMd5  should be (hdfsMd5)
  }

}

syncSpec:

import java.io.File

import org.apache.hadoop.fs.Path
import org.scalatest.{FlatSpec, Matchers}

/**
  * Created by todd.chen on 16/3/15.
  * email : todd.chen@ximalaya.com
  */
class SyncerSpec extends FlatSpec with Matchers{

//  val hdfsSystem = FileSystem.get(resources)
  val helper = PathSyncer
  "file in local and in hdfs is same file" should "do nothing" in {
//    val hdfsSystem = FileSystem.get(resources)
    val local = new File("/Users/cjuexuan/data/testfs/a")
    val hdfs = new Path("/tmp/todd/a")
    val localMd5 = helper.getLocalFileMd5(local)
    val oldHDFSMd5 =  helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should be (oldHDFSMd5)
    helper.syncHelper(local,hdfs,resources)
    val newHDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should be (newHDFSMd5)
  }

  "file in local and in hdfs with same name and  diff file" should "be update" in {
//    val hdfsSystem = FileSystem.get(resources)
    val local = new File("/Users/cjuexuan/data/testfs/b")
    val hdfs = new Path("/tmp/todd/b")
    val localMd5 = helper.getLocalFileMd5(local)
    val oldHDFSMd5 =  helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should  not be oldHDFSMd5
    helper.syncHelper(local,hdfs,resources)
    val newHDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should be (newHDFSMd5)
  }

  "in local is file and same path in hdfs is dir" should " be remove hdfs dir and upload local file" in {
    val local = new File("/Users/cjuexuan/data/testfs/b")
    val hdfs = new Path("/tmp/todd/b")
    val localMd5 = helper.getLocalFileMd5(local)
    helper.syncHelper(local,hdfs,resources)
    val HDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should be (HDFSMd5)

  }

  "in local is dir and in hdfs is file" should "be remove hdfs file and upload local dir" in {
    val local = new File("/Users/cjuexuan/data/testfs")
    val hdfs = new Path("/tmp/todd/testfs")
    helper.syncHelper(local,hdfs,resources)
  }

  "both in local and hdfs is dir" should "sync to same" in {
    val local = new File("/Users/cjuexuan/data/testfs")
    val hdfs = new Path("/tmp/todd/testfs")
    helper.syncHelper(local,hdfs,resources)
  }

}