Serverless Spark Java Query SDK 帮助用户更加轻松地通过 Java 语言使用 Serverless Spark 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取等。
本文提供了上述功能的示例代码,方便您参考使用。
系统概念:
目前 Serverless Spark支持的地域和API域名如下表所示:
Region(中文名称) | Region | Endpoint |
---|---|---|
华北-北京 | cn-beijing | open.volcengineapi.com |
华东-上海 | cn-shanghai | open.volcengineapi.com |
华南-广州 | cn-guangzhou | open.volcengineapi.com |
亚太东南-柔佛 | ap-southeast-1 | open.volcengineapi.com |
要求:
通过在 Maven 项目中增加依赖项的方式安装 SDK,在 pom.xml 中加入相应依赖即可。
<repository>
<id>bytedance-public</id>
<name>bytedance maven</name>
<url>https://artifact.bytedance.com/repository/releases</url>
</repository>
<dependency>
<groupId>com.volcengine.emr.serverless</groupId>
<artifactId>serverless-sdk-query</artifactId>
<version>1.0.2</version>
</dependency>
Serverless Spark SDK目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:
import com.volcengine.emr.serverless.ServerlessClientOption;
import com.volcengine.emr.serverless.ServerlessQueryClient;
String ak = "${access_key}";
String sk = "${secret_key}";
String endpoint = "https://open.volcengineapi.com";
String service = "emr_serverless";
String region = "cn-beijing";
int connTimeoutMs = 30000;
int socketTimeoutMs = 30000;
ServerlessClientOption options = ServerlessClientOption.builder(ak, sk)
.endpoint(endpoint)
.service(service)
.region(region)
.connectionTimeoutMs(connTimeoutMs)
.socketTimeoutMs(socketTimeoutMs)
.build();
ServerlessQueryClient client = new ServerlessQueryClient(options);
ServerlessQueryClient
客户端是后续调用各种Serverless Spark功能的入口,当前版本Client提供如下API接口:
API | 功能 |
---|---|
executeSQL | 执行SQL作业 |
executeJar | 执行Jar作业 |
cancelJob | 取消任务实例 |
getJob | 获取任务实例状态 |
rerunJob | 重跑作业实例 |
getResult | 获取作业结果 |
getResultUrl | 获取作业结果集链接 |
getTrackingUrl | 获取作业UI链接 |
getDriverLog | 获取作业driver日志 |
getSubmissionLog | 获取作业提交日志 |
listQueues | 获取队列列表 |
初始化Client完成后,可通过执行相关Task(目前支持SparkSQL,SparkJar两种任务类型)来进行任务执行。
如下为一个进行简单SQL查询的例子:
import com.volcengine.emr.serverless.SQLTask;
import com.volcengine.emr.serverless.Job;
import com.volcengine.emr.serverless.JobStatus;
String query = "select 1";
String name = "first_query_task";
// 定义任务
SQLTask sqlTask = SQLTask.builder(query)
.name(name)
.sync(true)
.build();
// 执行任务
Job job = client.executeSQL(sqlTask);
本节将以代码示例的形式展示更多Serverless Spark功能的使用方式。
import com.volcengine.emr.serverless.SQLTask;
import com.volcengine.emr.serverless.Job;
import com.volcengine.emr.serverless.JobStatus;
String sql = "${your_sql_statement}"; // 待执行的sql
String taskName = "${your_task_name}"; // SQL作业名
String queueName = "${your_queue_name}"; // 需要使用的队列,可选参数,不填将自动选取
SQLTask sqlTask = SQLTask.builder(sql)
.name(taskName)
.queue(queueName)
// 用于传入一些任务参数,eg. spark.sql.shuffle.partitions -> 2000
.addConf("serverless.compute.group.name", "xxx") // 计算组名,可选任务,默认选用Default计算组
.addConf("spark.sql.xxx", "xxx")
.sync(true) // 同步执行直至任务完成
.build();
// 执行任务
Job job = client.executeSQL(sqlTask);
assertEquals(JobStatus.COMPLETED.status(), job.getStatus());
支持Spark3.5语法
同步/异步任务提交的差异请参照 5.1.3 同步/异步执行
SparkJar任务为用户提供了通过编写Spark应用进行定制化数据分析需求的支持。详见 Spark Jar 作业开发指南 文档。
示例:
import com.volcengine.emr.serverless.JarTask;
import com.volcengine.emr.serverless.Job;
import com.volcengine.emr.serverless.JobStatus;
String jarResource = "${tos_path_to_jar_resource}"; // 待执行的jar资源的tos路径,例如 tos://bucket/path/to/spark_pi_example.jar
String mainClass = "${main_class_reference}"; // jar任务的主类,例如 org.apache.spark.examples.SparkPi
List<String> mainArgs = ImmutableList.of("${param1}", "${param_2}"); // jar任务的主类参数
String taskName = "${your_task_name}"; // Jar作业名
String queueName = "${your_queue_name}"; // 需要使用的队列,可选参数,不填将自动选取
JarTask jarTask = JarTask.builder(jarResource, mainClass)
.name(taskName)
.queue(queueName)
.mainArgs(mainArgs)
// 用于传入一些任务参数,eg. spark.sql.shuffle.partitions -> 2000
.addConf("spark.sql.xxx", "xxx")
.sync(true) // 同步执行直至任务完成
.build();
// 执行任务
Job job = client.executeJar(jarTask);
assertEquals(JobStatus.COMPLETED.status(), job.getStatus());
通过配置 Task 的 sync 参数进行控制:
// 同步,阻塞直至任务状态变为 成功/失败/取消 等终止状态
SQLTask task = SQLTask.builder(query)
.sync(true)
.build();
// 异步,立刻返回任务实例当前的状态
SQLTask task = SQLTask.builder(query)
.sync(false)
.build();
// 后续可通过自定义逻辑定期检查最新job状态进行处理
/*
例如下面例子,将阻塞等待直至任务完成
public static void waitForJob(ServerlessQueryClient client,
String jobId, Function<Job, Boolean> condition
) {
Job job = client.getJob(jobId);
try {
while (condition.apply(job)) {
Thread.sleep(5 * 1000L);
job = client.getJob(jobId);
}
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
}
waitForJob(client, job.getId(), jb -> {
JobStatus jobStatus = JobStatus.fromString(jb.getStatus());
return !JobStatus.isFinished(jobStatus);
});
*/
// 直接传入任务id进行取消
client.cancelJob("${jobId}");
可以根据任务ID进行任务实例的获取:
// 直接传入任务id获取该任务当前的实例状态
client.getJob("${jobId}");
从拿到的任务实例获取任务对应的 Spark UI 页面:
// 如果没有产生界面会报异常,所以使用的时候建议用try catch环绕
client.getTrackingUrl("${jobId}");
对job实例调用 getResult()
获取任务的查询结果:
import com.volcengine.emr.serverless.ResultCursor;
// 直接传入任务id获取该任务的ResultCursor
ResultCursor resultCursor = client.getResult("${jobId}");
// 输出结果集header
List<String[]> resultHeader = resultCursor.getHeader();
for (String[] field : resultHeader) {
System.out.println(Arrays.toString(field));
}
// 输出结果集
while (resultCursor.hasNext()) {
resultCursor.fetchNextPage();
List<List<String>> currentRows = resultCursor.getCurrentRows();
for (List<String> row : currentRows) {
System.out.println(row);
}
}
也可以直接调用getResultUrl()
获取任务的查询结果的下载链接:
// 通过传入任务Id获取结果集下载链接
String resultUrl = client.getResultUrl("${jobId}");
System.out.println(resultUrl);
根据任务id获取提交日志
String jobId = "${job_id}";
LogCursor logCursor = client.getSubmissionLog(jobId);
List<String> logRows = new ArrayList<>();
while (logCursor.hasNext()) {
logCursor.fetchNextPage();
List<String> currentRows = logCursor.getCurrentRows();
logList.addAll(currentRows);
}
根据任务id获取执行日志
String jobId = "${job_id}";
LogCursor logCursor = client.getDriverLog(jobId);
List<String> logRows = new ArrayList<>();
while (logCursor.hasNext()) {
logCursor.fetchNextPage();
List<String> currentRows = logCursor.getCurrentRows();
logList.addAll(currentRows);
}
调用listQueues()
可获取所有当前用户的可执行队列名:
List<String> executableQueues = client.listQueues();
任务异常将会以 com.volcengine.emr.serverless.QuerySdkException
(unchecked exception,无需显式 catch)的形式进行抛出,exception message 内携带具体的执行错误信息。