Flink DataSail Connector是DataSail服务提供的用于对接Apache Flink的连接器,为客户提供了使用Flink生产或消费DataSail数据集的能力。
请确保您已开通了您需要访问的服务。您可前往火山引擎控制台开通全域数据集成服务,详见服务开通。
Access Key(访问密钥)是访问火山引擎服务的安全凭证,包含Access Key ID(简称为AK)和Secret Access Key(简称为SK)两部分。您可登录火山引擎控制台,前往访问控制的访问密钥中创建及管理您的Access Key。更多信息可参考访问密钥帮助文档。
在DataSail中确认已创建要生产或消费的数据集
Java版本需要不低于1.8
Flink版本需要不低于1.11
mvn install:install-file -Dfile=datasail-subscriber-java-cloud-1.0-SNAPSHOT.jar -DgroupId=com.volcengine.datasail -DartifactId=datasail-subscriber-java-cloud -Dversion=1.0-SNAPSHOT -Dpackaging=jar mvn install:install-file -Dfile=flink-connector-datasail-parent-1.0.0-SNAPSHOT.pom -DgroupId=com.volcengine.datasail -DartifactId=flink-connector-datasail-parent -Dversion=1.0.0-SNAPSHOT -Dpackaging=pom mvn install:install-file -Dfile=flink-connector-datasail-1.0.0-SNAPSHOT.jar -DgroupId=com.volcengine.datasail -DartifactId=flink-connector-datasail -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar
<dependency> <groupId>com.volcengine.datasail</groupId> <artifactId>flink-connector-datasail</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>
参数名 | 类型 | 配置项含义 | 示例 |
---|---|---|---|
accessKey | string | 火山云 Access Key | AKLTZW***** |
secretKey | string | 火山云 Secret Key | TW1KaVlURmlaR0******* |
dataset | string | DataSail 数据集 | byteio_dataset_test |
network | 枚举 | 消费网络类型(公网消费or私网消费) | NETWORK_EXTERNAL |
subscribeCenter | string | 数据订阅配置中心域名 | https://datasail01-cn-beijing.volceapplog.com/ |
consumerGroup | string | 消费者组 | test |
autoOffsetReset | 枚举 | 初始无offset时的消费策略 | earliest |
package com.volcengine.datasail.streaming.connectors.examples; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.volcengine.datasail.streaming.connectors.datasail.ConsumerConfig; import com.volcengine.datasail.streaming.connectors.datasail.FlinkDataSailConsumer; public class SourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. Source配置参数 ConsumerConfig consumerConfig = ConsumerConfig.builder() .accessKey("Your access key") .secretKey("Your secret key") .dataset("source_example") .subscribeCenter("https://datasail01-cn-beijing.volceapplog.com") .consumerGroup("source_example_group") .startFromLatest() .build(); // 2. 消费DataSail数据集 DataStream<String> stream = env.addSource(new FlinkDataSailConsumer<>( consumerConfig, new SimpleStringSchema())); // 3. 打印消息到标准输出流 stream.print(); // 4. 开始执行 env.execute("Source Example"); } }
参数名 | 类型 | 配置项含义 | 示例 |
---|---|---|---|
accessKey | string | 火山云 Access Key | AKLTZW***** |
secretKey | string | 火山云 Secret Key | TW1KaVlURmlaR0******* |
dataset | string | DataSail 数据集 | byteio_dataset_test |
network | 枚举 | 消费网络类型(公网消费or私网消费) | NETWORK_EXTERNAL |
subscribeCenter | string | 数据订阅配置中心域名 | https://datasail01-cn-beijing.volceapplog.com/ |
package com.volcengine.datasail.streaming.connectors.examples; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import com.volcengine.datasail.streaming.connectors.datasail.FlinkDataSailProducer; import com.volcengine.datasail.streaming.connectors.datasail.ProducerConfig; public class SinkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. 消费任意数据源,这里使用的是模拟数据源 DataStream<String> stream = env.addSource(new InfiniteNumbersSource()); // 2. Sink配置参数 ProducerConfig producerConfig = ProducerConfig.builder() .accessKey("Your access key") .secretKey("Your secret key") .dataset("sink_example") .subscribeCenter("https://datasail01-cn-beijing.volceapplog.com") .build(); // 3. 生产到DataSail数据集 stream.addSink(new FlinkDataSailProducer<>( producerConfig, new SimpleStringSchema())); // 4. 开始执行 env.execute("Sink Example"); } private static class InfiniteNumbersSource implements SourceFunction<String> { private volatile boolean isRunning = true; private long counter = 0; @Override public void run(SourceContext<String> context) { while (isRunning) { try { Thread.sleep(1_000L); } catch (InterruptedException e) { throw new RuntimeException(e); } context.collect(Long.toString(counter++)); } } @Override public void cancel() { isRunning = false; } } }