You need to enable JavaScript to run this app.
导航
Java Query SDK
最近更新时间:2024.07.18 16:50:20首次发布时间:2024.05.15 19:49:44

1 简介

Serverless Spark Java Query SDK 帮助用户更加轻松地通过 Java 语言使用 Serverless Spark 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取等。
本文提供了上述功能的示例代码,方便您参考使用。

2 概念说明

系统概念:

  • Endpoint:表示 Serverless Spark 对外服务的 API 域名
  • Region:表示 Serverless Spark 的数据中心所在的物理区域

目前 Serverless Spark支持的地域和API域名如下表所示:

Region(中文名称)

Region

Endpoint

华北-北京

cn-beijing

open.volcengineapi.com

华东-上海

cn-shanghai

open.volcengineapi.com

华南-广州

cn-guangzhou

open.volcengineapi.com

亚太东南-柔佛

ap-southeast-1

open.volcengineapi.com

  • Access Key / Secret Access Key:访问火山引擎 API 的密钥;用户可以通过火山引擎的“密钥管理”页面获取到 Access Key 和 Secret Access Key

3 安装 SDK

要求:

  • Java 1.8 或更高版本
  • Maven

通过在 Maven 项目中增加依赖项的方式安装 SDK,在 pom.xml 中加入相应依赖即可。

<repository>
    <id>bytedance-public</id>
    <name>bytedance maven</name>
    <url>https://artifact.bytedance.com/repository/releases</url>
</repository>

<dependency>
  <groupId>com.volcengine.emr.serverless</groupId>
  <artifactId>serverless-sdk-query</artifactId>
  <version>1.0.2</version>
</dependency>

4 快速入门

4.1 初始化客户端

Serverless Spark SDK目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:

import com.volcengine.emr.serverless.ServerlessClientOption;
import com.volcengine.emr.serverless.ServerlessQueryClient;

String ak = "${access_key}";
String sk = "${secret_key}";
String endpoint = "https://open.volcengineapi.com";
String service = "emr_serverless";
String region = "cn-beijing";
int connTimeoutMs = 30000;
int socketTimeoutMs = 30000;

ServerlessClientOption options = ServerlessClientOption.builder(ak, sk)
  .endpoint(endpoint)
  .service(service)
  .region(region)
  .connectionTimeoutMs(connTimeoutMs)
  .socketTimeoutMs(socketTimeoutMs)
  .build();

ServerlessQueryClient client = new ServerlessQueryClient(options);

ServerlessQueryClient 客户端是后续调用各种Serverless Spark功能的入口,当前版本Client提供如下API接口:

API

功能

executeSQL

执行SQL作业

executeJar

执行Jar作业

cancelJob

取消任务实例

getJob

获取任务实例状态

rerunJob

重跑作业实例

getResult

获取作业结果

getResultUrl

获取作业结果集链接

getTrackingUrl

获取作业UI链接

getDriverLog

获取作业driver日志

getSubmissionLog

获取作业提交日志

listQueues

获取队列列表

4.2 第一个查询

初始化Client完成后,可通过执行相关Task(目前支持SparkSQL,SparkJar两种任务类型)来进行任务执行。
如下为一个进行简单SQL查询的例子:

import com.volcengine.emr.serverless.SQLTask;
import com.volcengine.emr.serverless.Job;
import com.volcengine.emr.serverless.JobStatus;

String query = "select 1";
String name = "first_query_task";

// 定义任务
SQLTask sqlTask = SQLTask.builder(query)
  .name(name)
  .sync(true)
  .build();

// 执行任务
Job job = client.executeSQL(sqlTask);

5 更多示例

本节将以代码示例的形式展示更多Serverless Spark功能的使用方式。

5.1 提交/取消任务

5.2 提交 SQL 任务

import com.volcengine.emr.serverless.SQLTask;
import com.volcengine.emr.serverless.Job;
import com.volcengine.emr.serverless.JobStatus;

String sql = "${your_sql_statement}";  // 待执行的sql
String taskName = "${your_task_name}";  // SQL作业名
String queueName = "${your_queue_name}";  // 需要使用的队列,可选参数,不填将自动选取
SQLTask sqlTask = SQLTask.builder(sql)
  .name(taskName)
  .queue(queueName)
  // 用于传入一些任务参数,eg. spark.sql.shuffle.partitions -> 2000
  .addConf("spark.sql.xxx", "xxx")
  .sync(true) // 同步执行直至任务完成
  .build();

// 执行任务
Job job = client.executeSQL(sqlTask);

assertEquals(JobStatus.COMPLETED.status(), job.getStatus());

支持Spark3.5语法
同步/异步任务提交的差异请参照 5.1.3 同步/异步执行

5.3 提交 SparkJar 任务

SparkJar任务为用户提供了通过编写Spark应用进行定制化数据分析需求的支持。详见 Spark Jar 作业开发指南 文档。
示例:

import com.volcengine.emr.serverless.JarTask;
import com.volcengine.emr.serverless.Job;
import com.volcengine.emr.serverless.JobStatus;


String jarResource = "${tos_path_to_jar_resource}";  // 待执行的jar资源的tos路径,例如 tos://bucket/path/to/spark_pi_example.jar
String mainClass = "${main_class_reference}";  // jar任务的主类,例如 org.apache.spark.examples.SparkPi
List<String> mainArgs = ImmutableList.of("${param1}", "${param_2}");  // jar任务的主类参数
String taskName = "${your_task_name}";  // Jar作业名
String queueName = "${your_queue_name}";  // 需要使用的队列,可选参数,不填将自动选取
JarTask jarTask = JarTask.builder(jarResource, mainClass)
  .name(taskName)
  .queue(queueName)
  .mainArgs(mainArgs)
  // 用于传入一些任务参数,eg. spark.sql.shuffle.partitions -> 2000
  .addConf("spark.sql.xxx", "xxx")
  .sync(true) // 同步执行直至任务完成
  .build();

// 执行任务
Job job = client.executeJar(jarTask);

assertEquals(JobStatus.COMPLETED.status(), job.getStatus());

5.4 同步/异步执行

通过配置 Task 的 sync 参数进行控制:

// 同步,阻塞直至任务状态变为 成功/失败/取消 等终止状态
SQLTask task = SQLTask.builder(query)
  .sync(true)
  .build();

// 异步,立刻返回任务实例当前的状态
SQLTask task = SQLTask.builder(query)
  .sync(false)
  .build();
// 后续可通过自定义逻辑定期检查最新job状态进行处理
/*
例如下面例子,将阻塞等待直至任务完成

public static void waitForJob(ServerlessQueryClient client,
  String jobId, Function<Job, Boolean> condition
) {
  Job job = client.getJob(jobId);
  try {
    while (condition.apply(job)) {
      Thread.sleep(5 * 1000L);
      job = client.getJob(jobId);
    }
  } catch (InterruptedException e) {
    log.error("InterruptedException", e);
  }
}

waitForJob(client, job.getId(), jb -> {
  JobStatus jobStatus = JobStatus.fromString(jb.getStatus());
  return !JobStatus.isFinished(jobStatus);
});

*/

5.5 取消任务

// 直接传入任务id进行取消
client.cancelJob("${jobId}");

6 查看任务实例相关信息

6.1 获取任务实例

可以根据任务ID进行任务实例的获取:

// 直接传入任务id获取该任务当前的实例状态
client.getJob("${jobId}");

6.2 获取引擎侧任务执行 UI

从拿到的任务实例获取任务对应的 Spark UI 页面:

// 如果没有产生界面会报异常,所以使用的时候建议用try catch环绕
client.getTrackingUrl("${jobId}");

7 获取查询结果

对job实例调用 getResult() 获取任务的查询结果:

import com.volcengine.emr.serverless.ResultCursor;

// 直接传入任务id获取该任务的ResultCursor
ResultCursor resultCursor = client.getResult("${jobId}");

// 输出结果集header
List<String[]> resultHeader = resultCursor.getHeader();
for (String[] field : resultHeader) {
  System.out.println(Arrays.toString(field));
}

// 输出结果集
while (resultCursor.hasNext()) {
  resultCursor.fetchNextPage();
  List<List<String>> currentRows = resultCursor.getCurrentRows();
  for (List<String> row : currentRows) {
    System.out.println(row);
  }
}

也可以直接调用getResultUrl() 获取任务的查询结果的下载链接:

// 通过传入任务Id获取结果集下载链接
String resultUrl = client.getResultUrl("${jobId}");
System.out.println(resultUrl);

8 获取任务日志

8.1 获取提交日志

根据任务id获取提交日志

String jobId = "${job_id}";
LogCursor logCursor = client.getSubmissionLog(jobId);

List<String> logRows = new ArrayList<>();
while (logCursor.hasNext()) {
    logCursor.fetchNextPage();
    List<String> currentRows = logCursor.getCurrentRows();
    logList.addAll(currentRows);
}

8.2 获取执行日志

根据任务id获取执行日志

String jobId = "${job_id}";
LogCursor logCursor = client.getDriverLog(jobId);

List<String> logRows = new ArrayList<>();
while (logCursor.hasNext()) {
    logCursor.fetchNextPage();
    List<String> currentRows = logCursor.getCurrentRows();
    logList.addAll(currentRows);
}

9 获取队列列表

调用listQueues() 可获取所有当前用户的可执行队列名:

List<String> executableQueues = client.listQueues();

10 执行异常

任务异常将会以 com.volcengine.emr.serverless.QuerySdkException(unchecked exception,无需显式 catch)的形式进行抛出,exception message 内携带具体的执行错误信息。