You need to enable JavaScript to run this app.
导航
流式导入
最近更新时间:2025.02.18 19:45:44首次发布时间:2021.10.22 10:06:12

在 ByteHouse 中,您可以直接通过 Kafka 或 Confluent Cloud 流式传输数据。Kafka 数据导入任务将持续运行,读取 Topic 中的消息。ByteHouse 的 Kafka 任务可以保证 exactly once ,您的数据在消费后即可立即访问。
同时可以随时停止数据导入任务以减少资源使用,并在任何必要的时候恢复该任务。ByteHouse 将在内部记录 offset,以确保停止/恢复过程中不会丢失数据。
支持的 Kafka/Confluent Cloud 版本:0.10 及以上。

必备权限

要将 Kafka 数数据迁移到ByteHouse,需要确保 Kafka 和 ByteHouse 之间的访问权限配置正确。
需要在Kafka中授予4个权限:

  • 列出主题 (Topics)
  • 列出消费者组 (Consumer group)
  • 消费消息 (Consume message)
  • 创建消费者,以及消费者组 (consumers & consumer groups)

有关通过 Kafka 授权命令行界面授予权限的更多信息,请单击此处

创建任务
  1. 在ByteHouse控制台,点击数据加载页签,单击 新建导入任务 按钮,进入任务创建界面。

  2. 填写导入任务基本信息,并选择 Kafka 数据流类型。
    Image

  3. 您可以在下拉框中选择已创建的数据源,如果没有已创建的数据源,您也可以单击连接新的数据源,新建一个Kafka数据源。
    以下为您介绍新建Kafka数据源时的参数配置说明。
    Image

    1. 配置源名称。您可以自定义数据源名称。

    2. 根据实际数据流式写入需求,选择是否勾选“火山内网模式”。
      如果您的Kafka服务在火山引擎上,且对数据写入安全要求比较高,您可勾选需要勾“火山内网模式”,勾选后只能通过内网访问Kafka,因此勾选后您需配置Kafka所在的VPC ID。详情请参见下文高阶用法章节中的通过火山内网模式连接火山 KAFKA

    3. 配置Kafka代理列表 IP 地址,并配置身份验证信息。
      当前 Kafka 数据源支持以下四种鉴权模式,并支持 SSL 加密,选择不同鉴权模式时,Kafka代理列表 IP 地址的配置要求不一致,详情见下表。

      注意

      • 配置的Kafka代理列表 IP 地址,需拼接Kafka 公/私网络地址+端口号,格式为Kafka网络ID:端口号。多个拼接完的Kafka代理列表 IP 地址间通过逗号,分隔。
      • 如果您使用的是火山引擎Kafka,且使用的是私有网络访问方式,您可以通过以下表格获取各鉴权模式下的默认Kafka网卡私有网络ID端口号,若无扩/缩容等运维操作,默认的 IP 地址和端口号不会变。公网访问或其他云厂商的Kafka,可根据对应产品环境要求获取并拼接Kafka代理列表 IP 地址。

      鉴权模式

      默认的Kafka私有网络ID(火山引擎Kafka)

      默认端口号

      None 无鉴权

      1. 在Kafka页面点击的“私有网络”地址链接。
        Image
      2. 在新页面中切换到“网卡”页签,通过私有网络ID过滤,选择结果中的私网IP。
        Image
        Image

      默认端口号为:9092。

      PLAINTEXT 鉴权(支持 SSL)

      • 如果未开启SSL:默认端口号为9093。
      • 如果开启SSL:默认端口号为9095。

      SCRAM-SHA-256(支持 SSL)

      SCRAM-SHA-512(支持 SSL)

  4. 选择数据源后,您可以进一步选择要加载的导入任务的 Topic。您可以选择为该 Topic 创建一个消费者组。然后您可以指定已支持的消费格式。

  5. 定义 Schema 映射。
    您可以参考下文高阶用法的在目标表中生成数据落库的时间戳章节,熟悉了解Schema 映射的配置详情。

  6. 您可以为要加载的 Topic 选择一个表。首次使用时你可以基于解析的消息格式创建新表。

  7. Kafka 定制化筛选器:要在 Kafka 导入任务中使用定制化筛选器,请切换“定制化筛选器”按钮以打开文本框。输入过滤器设置,例如 WHERE column1 = 'abc'。(注意,此功能仅适用于新建的Kafka 导入任务)
    Image

  8. 导入任务一旦创建后将处于暂停状态。然后您就可以开始操作这项任务了。

查看任务

在数据加载页面,您将看到所有类型的所有数据导入任务。
您可以如下图筛选按钮过滤任务:
Image

开启/停止任务
  1. 启动流式导入任务,请进入流式任务页面,然后单击开启
    Image
  2. 如果需要停止流式任务,请进入到流式任务页面内,然后点击停止
    Image
  3. 所有导入任务历史记录将保存在任务详细信息页面。
    Image

高阶用法

在目标表中生成数据落库的时间戳

需要在「数据导入」-> 「新建导入任务」 -> 「定义 Schema 映射」中配置如下红框圈中的列映射:

说明:

  • _c0 是占位符
  • 数据类型需要选择 DateTime
  • Expression 的值是 Bytehouse 的 now() 表达式,该表达式会返回当前时间。

通过火山内网模式连接火山 KAFKA

如果您的Kafka服务在火山引擎上,且对数据写入安全要求比较高,您可勾选需要勾“火山内网模式”,勾选后只能通过内网访问Kafka,因此勾选后您需配置Kafka所在的VPC ID。

注意

如果您勾选了火山内网模式,通过内网访问火山 Kafka,完成以下配置后,您还需要在 KAFKA 所在的 VPC 安全组规则添加白名单:100.64.0.0/10

Kafka所在的VPC ID需配置为火山引擎队列消息kafka实例的私有网络VPC ID来连接。
Image

参数

配置说明

Kafka 所在的VPC ID

配置为火山引擎队列消息kafka实例的私有网络VPC地址。您可以通过以下方式获取kafka实例的私有网络VPC ID:
访问“消息队列Kafka版”中的实例,在实例详情页面中的“私有网络”中可以找到具体的VPC ID。
Image
在新页面中切换到“网卡”页签,通过私有网络ID过滤,选择结果中的私网IP。
Image
Image