hive jdbc client
客户端执行sql入口statement.execute(sql);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
HiveStatement.java
public boolean execute(String sql) throws SQLException {
runAsyncOnServer(sql);
TGetOperationStatusResp status = waitForOperationToComplete();
// The query should be completed by now
if (!status.isHasResultSet() && !stmtHandle.isHasResultSet()) {
return false;
}
resultSet = new HiveQueryResultSet.Builder(this).setClient(client)
.setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
.setScrollable(isScrollableResultset)
.build();
return true;
}
private void runAsyncOnServer(String sql) throws SQLException {
checkConnection("execute");
reInitState();
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
/**
* Run asynchronously whenever possible
* Currently only a SQLOperation can be run asynchronously,
* in a background operation thread
* Compilation can run asynchronously or synchronously and execution run asynchronously
*/
execReq.setRunAsync(true);
execReq.setConfOverlay(sessConf);
execReq.setQueryTimeout(queryTimeout);
try {
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
stmtHandle = execResp.getOperationHandle();
isExecuteStatementFailed = false;
} catch (SQLException eS) {
isExecuteStatementFailed = true;
isLogBeingGenerated = false;
throw eS;
} catch (Exception ex) {
isExecuteStatementFailed = true;
isLogBeingGenerated = false;
throw new SQLException(ex.toString(), "08S01", ex);
}
}
thrift client侧通过sendBase发送ExecuteStatement给服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
//TCLIService.Iface
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws org.apache.thrift.TException
{
send_ExecuteStatement(req);
return recv_ExecuteStatement();
}
public void send_ExecuteStatement(TExecuteStatementReq req) throws org.apache.thrift.TException
{
ExecuteStatement_args args = new ExecuteStatement_args();
args.setReq(req);
sendBase("ExecuteStatement", args);
}
服务端相关启动的类是CLASS=”org.apache.spark.sql.hive.thriftserver.HiveThriftServer2”,入口函数main
1
2
3
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
server.init(executionHive.conf)
server.start()
init()会添加两种service,cliService,还有个thriftCLIService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override def init(hiveConf: HiveConf) {
val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
new ThriftHttpCLIService(sparkSqlCliService)
} else {
new ThriftBinaryCLIService(sparkSqlCliService)
}
setSuperField(this, "thriftCLIService", thriftCliService)
addService(thriftCliService)
initCompositeService(hiveConf)
}
cliService就是SparkSQLCLIService,thriftCLIService这里会有两种可选择,ThriftBinaryCLIService以及tcp模式的ThriftHttpCLIService
这两个都是封装的thrift相关的,按理thrift server服务直接看processor就好了,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
at org.apache.hive.service.cli.session.SessionManager.submitBackgroundOperation(SessionManager.java:354)
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.runInternal(SparkExecuteStatementOperation.scala:196)
at org.apache.hive.service.cli.operation.Operation.run(Operation.java:257)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:388)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:375)
at org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:275)
at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:436)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1313)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1298)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
先看ThriftHttpCLIService,在其run函数里边启动jettyserver,processor是TCLIService.Processor
1
2
3
4
5
6
7
8
9
10
11
TProcessor processor = new TCLIService.Processor<Iface>(this);
TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType,
serviceUGI, httpUGI);
// Context handler
final ServletContextHandler context = new ServletContextHandler(
ServletContextHandler.SESSIONS);
context.setContextPath("/");
String httpPath = getHttpPath(hiveConf
.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
httpServer.setHandler(context);
context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
在TServlet的post看到processor的处理逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
TTransport inTransport = null;
Object var4 = null;
try {
response.setContentType("application/x-thrift");
if (null != this.customHeaders) {
Iterator i$ = this.customHeaders.iterator();
while(i$.hasNext()) {
Entry<String, String> header = (Entry)i$.next();
response.addHeader((String)header.getKey(), (String)header.getValue());
}
}
InputStream in = request.getInputStream();
OutputStream out = response.getOutputStream();
TTransport transport = new TIOStreamTransport(in, out);
TProtocol inProtocol = this.inProtocolFactory.getProtocol(transport);
TProtocol outProtocol = this.outProtocolFactory.getProtocol(transport);
this.processor.process(inProtocol, outProtocol);
out.flush();
} catch (TException var10) {
throw new ServletException(var10);
}
}
ThriftBinaryCLIService里边的不是直接使用的processor 而是processorFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
.processorFactory(processorFactory).transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory())
.inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
.requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
.beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executorService);
// TCP Server
server = new TThreadPoolServer(sargs);
server.setServerEventHandler(serverEventHandler);
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
server.serve();
这个factory也份两种形式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public TProcessorFactory getAuthProcFactory(ThriftCLIService service) throws LoginException {
if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName())) {
return KerberosSaslHelper.getKerberosProcessorFactory(saslServer, service);
} else {
return PlainSaslHelper.getPlainProcessorFactory(service);
}
}
//KerberosSaslHelper
public static TProcessorFactory getKerberosProcessorFactory(Server saslServer,
ThriftCLIService service) {
return new CLIServiceProcessorFactory(saslServer, service);
}
//PlainSaslHelper
public static TProcessorFactory getPlainProcessorFactory(ThriftCLIService service) {
return new SQLPlainProcessorFactory(service);
}
CLIServiceProcessorFactory与SQLPlainProcessorFactory都继承自TProcessorFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private KerberosSaslHelper() {
throw new UnsupportedOperationException("Can't initialize class");
}
private static class CLIServiceProcessorFactory extends TProcessorFactory {
private final ThriftCLIService service;
private final Server saslServer;
CLIServiceProcessorFactory(Server saslServer, ThriftCLIService service) {
super(null);
this.service = service;
this.saslServer = saslServer;
}
@Override
public TProcessor getProcessor(TTransport trans) {
TProcessor sqlProcessor = new TCLIService.Processor<Iface>(service);
return saslServer.wrapNonAssumingProcessor(sqlProcessor);
}
}
private static final class SQLPlainProcessorFactory extends TProcessorFactory {
private final ThriftCLIService service;
SQLPlainProcessorFactory(ThriftCLIService service) {
super(null);
this.service = service;
}
@Override
public TProcessor getProcessor(TTransport trans) {
return new TSetIpAddressProcessor<Iface>(service);
}
}
TSetIpAddressProcessor也是继承自TCLIService.Processor,这里讲了这么多Processor,看下里边具体的定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
processMap.put("OpenSession", new OpenSession());
processMap.put("CloseSession", new CloseSession());
processMap.put("GetInfo", new GetInfo());
processMap.put("ExecuteStatement", new ExecuteStatement());
processMap.put("GetTypeInfo", new GetTypeInfo());
processMap.put("GetCatalogs", new GetCatalogs());
processMap.put("GetSchemas", new GetSchemas());
processMap.put("GetTables", new GetTables());
processMap.put("GetTableTypes", new GetTableTypes());
processMap.put("GetColumns", new GetColumns());
processMap.put("GetFunctions", new GetFunctions());
processMap.put("GetOperationStatus", new GetOperationStatus());
processMap.put("CancelOperation", new CancelOperation());
processMap.put("CloseOperation", new CloseOperation());
processMap.put("GetResultSetMetadata", new GetResultSetMetadata());
processMap.put("FetchResults", new FetchResults());
processMap.put("GetDelegationToken", new GetDelegationToken());
processMap.put("CancelDelegationToken", new CancelDelegationToken());
processMap.put("RenewDelegationToken", new RenewDelegationToken());
return processMap;
}
ExecuteStatement应该不陌生,就是前面client提交过来的method了,在里还有这么一段代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public boolean process(TProtocol in, TProtocol out) throws TException {
TMessage msg = in.readMessageBegin();
ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name);
if (fn == null) {
TProtocolUtil.skip(in, (byte)12);
in.readMessageEnd();
TApplicationException x = new TApplicationException(1, "Invalid method name: '" + msg.name + "'");
out.writeMessageBegin(new TMessage(msg.name, (byte)3, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
return true;
} else {
fn.process(msg.seqid, in, out, this.iface);
return true;
}
}
//ProcessFunction
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
TBase args = this.getEmptyArgsInstance();
try {
args.read(iprot);
} catch (TProtocolException var10) {
iprot.readMessageEnd();
TApplicationException x = new TApplicationException(7, var10.getMessage());
oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)3, seqid));
x.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
return;
}
iprot.readMessageEnd();
TBase result = null;
try {
result = this.getResult(iface, args);
} catch (TException var9) {
LOGGER.error("Internal error processing " + this.getMethodName(), var9);
TApplicationException x = new TApplicationException(6, "Internal error processing " + this.getMethodName());
oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)3, seqid));
x.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
return;
}
if (!this.isOneway()) {
oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)2, seqid));
result.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
}
}
这个getResult是关键,就是map对应的,这里以上面的ExecuteStatement为例
1
2
3
4
5
6
public ExecuteStatement_result getResult(I iface, ExecuteStatement_args args) throws org.apache.thrift.TException {
ExecuteStatement_result result = new ExecuteStatement_result();
result.success = iface.ExecuteStatement(args.req);
return result;
}
}
iface就是服务端实现了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//ThriftCLIService
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
TExecuteStatementResp resp = new TExecuteStatementResp();
try {
SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
String statement = req.getStatement();
Map<String, String> confOverlay = req.getConfOverlay();
Boolean runAsync = req.isRunAsync();
OperationHandle operationHandle = runAsync ?
cliService.executeStatementAsync(sessionHandle, statement, confOverlay)
: cliService.executeStatement(sessionHandle, statement, confOverlay);
resp.setOperationHandle(operationHandle.toTOperationHandle());
resp.setStatus(OK_STATUS);
} catch (Exception e) {
LOG.warn("Error executing statement: ", e);
resp.setStatus(HiveSQLException.toTStatus(e));
}
return resp;
}
//CLIService
public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay) throws HiveSQLException {
OperationHandle opHandle = sessionManager.getSession(sessionHandle)
.executeStatementAsync(statement, confOverlay);
LOG.debug(sessionHandle + ": executeStatementAsync()");
return opHandle;
}
//SessionManager
public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException {
HiveSession session = handleToSession.get(sessionHandle);
if (session == null) {
throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle);
}
return session;
}
这个cliService不就是init里边放进去的SparkSQLCLIService吗,其父类是CLIService,注意区分前面的TCLIService
这里需要关注下HiveThriftServer2初始化方法init()里的一段代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override def init(hiveConf: HiveConf) {
val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
new ThriftHttpCLIService(sparkSqlCliService)
} else {
new ThriftBinaryCLIService(sparkSqlCliService)
}
setSuperField(this, "thriftCLIService", thriftCliService)
addService(thriftCliService)
initCompositeService(hiveConf)
}
就是最后这个 initCompositeService(hiveConf)
1
2
3
4
5
6
7
8
9
10
11
12
13
private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
def initCompositeService(hiveConf: HiveConf) {
// Emulating `CompositeService.init(hiveConf)`
val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
serviceList.asScala.foreach(_.init(hiveConf))
// Emulating `AbstractService.init(hiveConf)`
invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
setAncestorField(this, 3, "hiveConf", hiveConf)
invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
}
}
会通过反射调用serviceList的init(),一开始添加的两个service都会在这里被init,
看下SparkSQLCLIService的init过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sqlContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
var sparkServiceUGI: UserGroupInformation = null
if (UserGroupInformation.isSecurityEnabled) {
try {
HiveAuthFactory.loginFromKeytab(hiveConf)
sparkServiceUGI = Utils.getUGI()
setSuperField(this, "serviceUGI", sparkServiceUGI)
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
}
}
initCompositeService(hiveConf)
}
这里同样也会调用SparkSQLSessionManager的init吧,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//SparkSQLSessionManager
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
// Create operation log root directory, if operation logging is enabled
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
invoke(classOf[SessionManager], this, "initOperationLogRootDir")
}
val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
getAncestorField[Log](this, 3, "LOG").info(
s"HiveServer2: Async execution pool size $backgroundPoolSize")
setSuperField(this, "operationManager", sparkSqlOperationManager)
addService(sparkSqlOperationManager)
initCompositeService(hiveConf)
}
这个初始化里添加的service是SparkSQLOperationManager,但是SparkSQLOperationManager本身没有init,只有父类有了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private[thriftserver] class SparkSQLOperationManager()
extends OperationManager with Logging {
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
val conf = sqlContext.sessionState.conf
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(sqlContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
s"runInBackground=$runInBackground")
operation
}
上边的sessionManager应该是SparkSQLSessionManager,这里里边除了init只有两个方法了openSession与closeSession,其实还是调用的父类方法, 只是封装了下,那获取session就不难找了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,
Map<String, String> sessionConf, boolean withImpersonation, String delegationToken)
throws HiveSQLException {
HiveSession session;
// If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.
// Within the proxy object, we wrap the method call in a UserGroupInformation#doAs
if (withImpersonation) {
HiveSessionImplwithUGI sessionWithUGI = new HiveSessionImplwithUGI(protocol, username, password,
hiveConf, ipAddress, delegationToken);
session = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi());
sessionWithUGI.setProxySession(session);
} else {
session = new HiveSessionImpl(protocol, username, password, hiveConf, ipAddress);
}
session.setSessionManager(this);
session.setOperationManager(operationManager);
try {
session.open(sessionConf);
} catch (Exception e) {
try {
session.close();
} catch (Throwable t) {
LOG.warn("Error closing session", t);
}
session = null;
throw new HiveSQLException("Failed to open new session: " + e, e);
}
if (isOperationLogEnabled) {
session.setOperationLogSessionDir(operationLogRootDir);
}
handleToSession.put(session.getSessionHandle(), session);
return session.getSessionHandle();
}
回到之前调用的代码
1
2
3
4
5
6
7
public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay) throws HiveSQLException {
OperationHandle opHandle = sessionManager.getSession(sessionHandle)
.executeStatementAsync(statement, confOverlay);
LOG.debug(sessionHandle + ": executeStatementAsync()");
return opHandle;
}
这里就是HiveSessionImpl里的调用了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay)
throws HiveSQLException {
return executeStatementInternal(statement, confOverlay, true);
}
private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
boolean runAsync)
throws HiveSQLException {
acquire(true);
OperationManager operationManager = getOperationManager();
ExecuteStatementOperation operation = operationManager
.newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync);
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
opHandleSet.add(opHandle);
return opHandle;
} catch (HiveSQLException e) {
// Refering to SQLOperation.java,there is no chance that a HiveSQLException throws and the asyn
// background operation submits to thread pool successfully at the same time. So, Cleanup
// opHandle directly when got HiveSQLException
operationManager.closeOperation(opHandle);
throw e;
} finally {
release(true);
}
}
//Operation
public void run() throws HiveSQLException {
beforeRun();
try {
runInternal();
} finally {
afterRun();
}
}
这个Operation是啥呢,就是SparkExecuteStatementOperation,绕了一圈这里才是熟悉的sqlContext.sql()
1
2
3
4
5
6
7
8
9
10
11
12
13
override def runInternal(): Unit = {
//...
if (!runInBackground) {
execute()
} else {
//...
}
private def execute(): Unit = {
//...
result = sqlContext.sql(statement)
//...
}