对于一个平台来说,使用者对技术本身是不敏感的,所以我们需要增加一些限制来减少集群的一些不可控情况,例如不断的写入新表/新数据却不记得删除,大量不按规范创建的表名等情况。与此同时应尽量让技术对用户透明,比如让其无感知的访问多种类型的数据库。
下文以拦截 spark.sql() 方法为例,通过为 hive表的添加生命周期,自动切换 tidb 表,表权限校验等几个小功能 来说明。
如何使用
代码SparkSqlAspect.scala为了便于理解以下代码会进行一些删减import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.{Around, Aspect}import org.slf4j.LoggerFactoryimport org.apache.spark.sql.{Dataset, Row, SparkSession, TiContext}import cn.tongdun.datacompute.parser._
import cn.tongdun.datacompute.parser.spark.SparkSQLHelper@Aspect
class SparkSqlAspect { private val logger = LoggerFactory.getLogger(classOf[SparkSqlAspect]) private var tiContext: TiContext = null@Around("execution(public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> org.apache.spark.sql.SparkSession.sql(java.lang.String)) && args(sqlRaw)")
def around(pjp: ProceedingJoinPoint, sqlRaw: String): Dataset[Row] www.yongshiyule178.com= { //sparkSQLHelper 是我们基于 antlr4 增加了一些 sparksql 语法的支持,例如建表时需要指定 lifecycle 等 val sql = SparkSQLHelper.format(sqlRaw) val spark = pjp.getThis.asInstanceOf[SparkSession] var dataset: Dataset[Row] = spark.emptyDataFrame val statementData = SparkSQLHelper.getStatementData(sql) val statement = statementData.getStatement()//getType 方法用于获取sql的类型
statementData.getType match { case StatementType.CREATE_TABLE => createMethod()case StatementType.CREATE_TABLE_AS_SELECT =>
createAsSelectMethod()case StatementType.SELECT =>
dataset = selectMethod(spark, inputSql, statement, pjp)case _ =>
dataset = pjp.proceed(pjp.getArgs).asInstanceOf[Dataset[Row]] } dateset }// 建表必须带有 lifecycle 字段,并对表名进行校验,将相关信息注册到元数据系统等操作
def createMethod(): Unit = { ... }// 约定 create table as select 生成的表都为中间表,必须以 tdl_ 开头,lifecycle 固定为7天
def createAsSelectMethod(): Unit = { ... }// select 对多个数据库源进行判定以及对权限进行校验,下面以tidb为例
def selectMethod(spark: SparkSession, inputSql: String, statement: Statement, pjp: ProceedingJoinPoint)www.mhylpt.com/: Dataset[Row] = { val tableData = statement.asInstanceOf[TableData] //获取所有需要访问的源表 tableData.getInputTables.toArray.foreach { case t: TableSource => val databaseName = t.getDatabaseName val tableName = t.getTableName val fullTableName = databaseName + "." + tableName //所有tidb的库都以tidb为前缀 if (t.getDatabaseName.startsWith("tidb")) { //对tidb表权限进行校验 if(tableAuthCheck(...)){ //判断tiContext是否初始化 if (tiContext == null) { tiContext = new TiContext(spark) } //对tidb表的表名进行替换,避免与现有的临时表/中间表冲突 val replacedTable = "tdl_" + databaseName + "_" + tableName //加入tidb表数据源 tiContext.tidbMapTable(databaseName, tableName) //注册为临时表 tiContext.getDataFrame(databaseName, tableName).createOrReplaceTempView(replacedTable) //将sql语句中的表名进行替换 sql = StringUtils.replace(sql,www.dasheng178.com fullTableName, replacedTable) } else { throw new IllegalAccessError(fullTableName + "没有访问权限") } } case _ => } pjp.proceed(Array(sql)www.thd540.com).asInstanceOf[Dataset[Row]]配置
pom.xml<dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> <version>1.9.1</version> </dependency><dependency>
<groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.9.1</version> </dependency><!--公司内部版本,用于支持spark2.3-->
<dependency> <groupId>com.pingcap.tispark</groupId> <artifactId>tispark-core</artifactId> <version>1.1-SNAPSHOT</version> <scope>provided</scope> </dependency>resources/META-INF/AspectSql.aj<?xml version="1.0" encoding=www.taohuayuan178.com/"UTF-8" ?><aspectj> <aspects> <aspect name="cn.tongdun.aspectj.SparkSqlAspect"/> </aspects> <weaver options="-Xset:weaveJavaxPackages=true"/></aspectj>spark-defaults.confspark.driver.extraClassPath /path/to/spark-aspectj.jarspark.driver.extraJavaOptions -javaagent:/home/admin/aspectjweaver-1.9.1.jar结语通过上述的操作,在用户调用 spark.sql(...) 时将会触发相应的方法。hdfs/rdd/sparkSession/etc. 操作同理可实现。不同公司面临的真实场景各有不同,因此并没有过多的实现细节,仅给需要的同学提供一些思路