Serverless Spark Java Query SDK 帮助用户更加轻松地通过 Java 语言使用 Serverless Spark 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取等。
本文提供了上述功能的示例代码,方便您参考使用。
系统概念:
目前 Serverless Spark支持的地域和API域名如下表所示:
Region(中文名称) | Region | Endpoint |
---|---|---|
华北-北京 | cn-beijing | open.volcengineapi.com |
华东-上海 | cn-shanghai | open.volcengineapi.com |
华南-广州 | cn-guangzhou | open.volcengineapi.com |
亚太东南-柔佛 | ap-southeast-1 | open.volcengineapi.com |
要求:
通过在 Maven 项目中增加依赖项的方式安装 SDK,在 pom.xml 中加入相应依赖即可。
<repository> <id>bytedance-public</id> <name>bytedance maven</name> <url>https://artifact.bytedance.com/repository/releases</url> </repository> <dependency> <groupId>com.volcengine.emr.serverless</groupId> <artifactId>serverless-sdk-query</artifactId> <version>1.0.2</version> </dependency>
Serverless Spark SDK目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:
import com.volcengine.emr.serverless.ServerlessClientOption; import com.volcengine.emr.serverless.ServerlessQueryClient; String ak = "${access_key}"; String sk = "${secret_key}"; String endpoint = "https://open.volcengineapi.com"; String service = "emr_serverless"; String region = "cn-beijing"; int connTimeoutMs = 30000; int socketTimeoutMs = 30000; ServerlessClientOption options = ServerlessClientOption.builder(ak, sk) .endpoint(endpoint) .service(service) .region(region) .connectionTimeoutMs(connTimeoutMs) .socketTimeoutMs(socketTimeoutMs) .build(); ServerlessQueryClient client = new ServerlessQueryClient(options);
ServerlessQueryClient
客户端是后续调用各种Serverless Spark功能的入口,当前版本Client提供如下API接口:
API | 功能 |
---|---|
executeSQL | 执行SQL作业 |
executeJar | 执行Jar作业 |
cancelJob | 取消任务实例 |
getJob | 获取任务实例状态 |
rerunJob | 重跑作业实例 |
getResult | 获取作业结果 |
getResultUrl | 获取作业结果集链接 |
getTrackingUrl | 获取作业UI链接 |
getDriverLog | 获取作业driver日志 |
getSubmissionLog | 获取作业提交日志 |
listQueues | 获取队列列表 |
初始化Client完成后,可通过执行相关Task(目前支持SparkSQL,SparkJar两种任务类型)来进行任务执行。
如下为一个进行简单SQL查询的例子:
import com.volcengine.emr.serverless.SQLTask; import com.volcengine.emr.serverless.Job; import com.volcengine.emr.serverless.JobStatus; String query = "select 1"; String name = "first_query_task"; // 定义任务 SQLTask sqlTask = SQLTask.builder(query) .name(name) .sync(true) .build(); // 执行任务 Job job = client.executeSQL(sqlTask);
本节将以代码示例的形式展示更多Serverless Spark功能的使用方式。
import com.volcengine.emr.serverless.SQLTask; import com.volcengine.emr.serverless.Job; import com.volcengine.emr.serverless.JobStatus; String sql = "${your_sql_statement}"; // 待执行的sql String taskName = "${your_task_name}"; // SQL作业名 String queueName = "${your_queue_name}"; // 需要使用的队列,可选参数,不填将自动选取 SQLTask sqlTask = SQLTask.builder(sql) .name(taskName) .queue(queueName) // 用于传入一些任务参数,eg. spark.sql.shuffle.partitions -> 2000 .addConf("spark.sql.xxx", "xxx") .sync(true) // 同步执行直至任务完成 .build(); // 执行任务 Job job = client.executeSQL(sqlTask); assertEquals(JobStatus.COMPLETED.status(), job.getStatus());
支持Spark3.5语法
同步/异步任务提交的差异请参照 5.1.3 同步/异步执行
SparkJar任务为用户提供了通过编写Spark应用进行定制化数据分析需求的支持。详见 Spark Jar 作业开发指南 文档。
示例:
import com.volcengine.emr.serverless.JarTask; import com.volcengine.emr.serverless.Job; import com.volcengine.emr.serverless.JobStatus; String jarResource = "${tos_path_to_jar_resource}"; // 待执行的jar资源的tos路径,例如 tos://bucket/path/to/spark_pi_example.jar String mainClass = "${main_class_reference}"; // jar任务的主类,例如 org.apache.spark.examples.SparkPi List<String> mainArgs = ImmutableList.of("${param1}", "${param_2}"); // jar任务的主类参数 String taskName = "${your_task_name}"; // Jar作业名 String queueName = "${your_queue_name}"; // 需要使用的队列,可选参数,不填将自动选取 JarTask jarTask = JarTask.builder(jarResource, mainClass) .name(taskName) .queue(queueName) .mainArgs(mainArgs) // 用于传入一些任务参数,eg. spark.sql.shuffle.partitions -> 2000 .addConf("spark.sql.xxx", "xxx") .sync(true) // 同步执行直至任务完成 .build(); // 执行任务 Job job = client.executeJar(jarTask); assertEquals(JobStatus.COMPLETED.status(), job.getStatus());
通过配置 Task 的 sync 参数进行控制:
// 同步,阻塞直至任务状态变为 成功/失败/取消 等终止状态 SQLTask task = SQLTask.builder(query) .sync(true) .build(); // 异步,立刻返回任务实例当前的状态 SQLTask task = SQLTask.builder(query) .sync(false) .build(); // 后续可通过自定义逻辑定期检查最新job状态进行处理 /* 例如下面例子,将阻塞等待直至任务完成 public static void waitForJob(ServerlessQueryClient client, String jobId, Function<Job, Boolean> condition ) { Job job = client.getJob(jobId); try { while (condition.apply(job)) { Thread.sleep(5 * 1000L); job = client.getJob(jobId); } } catch (InterruptedException e) { log.error("InterruptedException", e); } } waitForJob(client, job.getId(), jb -> { JobStatus jobStatus = JobStatus.fromString(jb.getStatus()); return !JobStatus.isFinished(jobStatus); }); */
// 直接传入任务id进行取消 client.cancelJob("${jobId}");
可以根据任务ID进行任务实例的获取:
// 直接传入任务id获取该任务当前的实例状态 client.getJob("${jobId}");
从拿到的任务实例获取任务对应的 Spark UI 页面:
// 如果没有产生界面会报异常,所以使用的时候建议用try catch环绕 client.getTrackingUrl("${jobId}");
对job实例调用 getResult()
获取任务的查询结果:
import com.volcengine.emr.serverless.ResultCursor; // 直接传入任务id获取该任务的ResultCursor ResultCursor resultCursor = client.getResult("${jobId}"); // 输出结果集header List<String[]> resultHeader = resultCursor.getHeader(); for (String[] field : resultHeader) { System.out.println(Arrays.toString(field)); } // 输出结果集 while (resultCursor.hasNext()) { resultCursor.fetchNextPage(); List<List<String>> currentRows = resultCursor.getCurrentRows(); for (List<String> row : currentRows) { System.out.println(row); } }
也可以直接调用getResultUrl()
获取任务的查询结果的下载链接:
// 通过传入任务Id获取结果集下载链接 String resultUrl = client.getResultUrl("${jobId}"); System.out.println(resultUrl);
根据任务id获取提交日志
String jobId = "${job_id}"; LogCursor logCursor = client.getSubmissionLog(jobId); List<String> logRows = new ArrayList<>(); while (logCursor.hasNext()) { logCursor.fetchNextPage(); List<String> currentRows = logCursor.getCurrentRows(); logList.addAll(currentRows); }
根据任务id获取执行日志
String jobId = "${job_id}"; LogCursor logCursor = client.getDriverLog(jobId); List<String> logRows = new ArrayList<>(); while (logCursor.hasNext()) { logCursor.fetchNextPage(); List<String> currentRows = logCursor.getCurrentRows(); logList.addAll(currentRows); }
调用listQueues()
可获取所有当前用户的可执行队列名:
List<String> executableQueues = client.listQueues();
任务异常将会以 com.volcengine.emr.serverless.QuerySdkException
(unchecked exception,无需显式 catch)的形式进行抛出,exception message 内携带具体的执行错误信息。