4. 代码参考:
实现步骤:
第一步:
在SqlBase.g4中添加语法规则
statement
| COMPACT TABLE target=tableIdentifier partitionSpec?
(INTO fileNum=INTEGER_VALUE FILES)? #compactTable
ansiNonReserved
| FILES
nonReserved
| FILES
//--SPARK-KEYWORD-LIST-START
FILES: 'FILES';
2、编译antlr
3、SparkSqlParser.scala中添加visitCompactTable方法:
override def visitCompactTable(
ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier())
val filesNum = if (ctx.INTEGER_VALUE() != null) {
Some(ctx.INTEGER_VALUE().getText)
} else {
None
}
val partition = if (ctx.partitionSpec() != null) {
Some(ctx.partitionSpec().getText)
} else {
None
}
println("visitCompactTable of table" + table + " with partitionspec:" + partition + "into fileNum:" + filesNum)
CompactTableCommand(table,filesNum,partition);
}
4、添加 CompactTableCommand 类
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class CompactTableCommand(table: TableIdentifier,
filesNum: Option[String],
partitionSpec: Option[String])
extends RunnableCommand {
// 默认文件大小
private val defaultSize = 128 * 1024 * 1024
override def output: Seq[Attribute] = Seq(
AttributeReference("COMPACT_TABLE", StringType, nullable = false)()
)
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.catalog.setCurrentDatabase(table.database.getOrElse("default"))
// 临时文件名
val tempTableName = "`" + table.identifier + "_" + System.currentTimeMillis() + "`"
val originDataFrame = sparkSession.table(table.identifier)
val partitions = filesNum match {
case Some(files) => files.toInt
case None => (sparkSession.sessionState
.executePlan(originDataFrame.queryExecution.logical)
.optimizedPlan.stats.sizeInBytes / defaultSize).toInt + 1
}
// scalastyle:off println
println(partitions, tempTableName)
// 添加 partitionSpec,则只合并指定的 partition 目录的文件
if (partitionSpec.nonEmpty) {
sparkSession.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// 分区截取
val conditionExpr = partitionSpec.get.trim.stripPrefix("partition(").dropRight(1)
.replace(",", "AND")
println(conditionExpr)
originDataFrame.where(conditionExpr).repartition(partitions)
.write
.mode(SaveMode.Overwrite)
.saveAsTable(tempTableName)
sparkSession.table(tempTableName).write
.mode(SaveMode.Overwrite)
.insertInto(table.identifier)
} else {
//不加 partitionSpec,则into fileNum files,则把表中的文件合并成 128MB 大小
originDataFrame.repartition(partitions)
.write
.mode(SaveMode.Overwrite)
.saveAsTable(tempTableName)
sparkSession.table(tempTableName)
.write
.mode(SaveMode.Overwrite)
.saveAsTable(table.identifier)
}
// sparkSession.sql(s"DROP TABLE ${tempTableName}")
Seq(Row(s"compact table ${table.identifier} finished."))
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- yrrf.cn 版权所有 赣ICP备2024042794号-2
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务