您好,欢迎来到意榕旅游网。
搜索
您的当前位置:首页spark compact on table

spark compact on table

来源:意榕旅游网

实现 Compact table command

  • 基本要求是完成以下功能:COMPACT TABLE test1 INTO 500 FILES;
  • 如果添加 partitionSpec,则只合并指定的 partition 目录的文件;
  • 如果不加 into fileNum files,则把表中的文件合并成 128MB 大小。

4. 代码参考:

实现步骤:

第一步:

在SqlBase.g4中添加语法规则

statement
| COMPACT TABLE target=tableIdentifier partitionSpec?
    (INTO fileNum=INTEGER_VALUE FILES)?                            #compactTable

ansiNonReserved
| FILES

nonReserved
| FILES

//--SPARK-KEYWORD-LIST-START
FILES: 'FILES';

2、编译antlr

3、SparkSqlParser.scala中添加visitCompactTable方法:

override def visitCompactTable(
ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier())
val filesNum = if (ctx.INTEGER_VALUE() != null) {
Some(ctx.INTEGER_VALUE().getText)
} else {
None
}
val partition = if (ctx.partitionSpec() != null) {
Some(ctx.partitionSpec().getText)
} else {
None
}
println("visitCompactTable of table" + table + " with partitionspec:" + partition + "into fileNum:" + filesNum)
CompactTableCommand(table,filesNum,partition);
}

4、添加 CompactTableCommand 类

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class CompactTableCommand(table: TableIdentifier,
  filesNum: Option[String],
  partitionSpec: Option[String])
  extends RunnableCommand {

 // 默认文件大小
  private val defaultSize = 128 * 1024 * 1024

  override def output: Seq[Attribute] = Seq(
    AttributeReference("COMPACT_TABLE", StringType, nullable = false)()
  )

  override def run(sparkSession: SparkSession): Seq[Row] = {
    sparkSession.catalog.setCurrentDatabase(table.database.getOrElse("default"))

 // 临时文件名
    val tempTableName = "`" + table.identifier + "_" + System.currentTimeMillis() + "`"

    val originDataFrame = sparkSession.table(table.identifier)
    val partitions = filesNum match {
      case Some(files) => files.toInt
      case None => (sparkSession.sessionState
        .executePlan(originDataFrame.queryExecution.logical)
        .optimizedPlan.stats.sizeInBytes / defaultSize).toInt + 1
    }
    // scalastyle:off println
    println(partitions, tempTableName)

    // 添加 partitionSpec,则只合并指定的 partition 目录的文件
    if (partitionSpec.nonEmpty) {

      sparkSession.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

    // 分区截取
      val conditionExpr = partitionSpec.get.trim.stripPrefix("partition(").dropRight(1)
        .replace(",", "AND")
      
      println(conditionExpr)

      originDataFrame.where(conditionExpr).repartition(partitions)
        .write
        .mode(SaveMode.Overwrite)
        .saveAsTable(tempTableName)

      sparkSession.table(tempTableName).write
        .mode(SaveMode.Overwrite)
        .insertInto(table.identifier)
    } else {
      //不加 partitionSpec,则into fileNum files,则把表中的文件合并成 128MB 大小
      originDataFrame.repartition(partitions)
        .write
        .mode(SaveMode.Overwrite)
        .saveAsTable(tempTableName)

      sparkSession.table(tempTableName)
        .write
        .mode(SaveMode.Overwrite)
        .saveAsTable(table.identifier)
    }

    // sparkSession.sql(s"DROP TABLE ${tempTableName}")

    Seq(Row(s"compact table ${table.identifier} finished."))
  }
}

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- yrrf.cn 版权所有 赣ICP备2024042794号-2

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务