火山引擎DataSail提供数据上报API,用户在开通DataSail服务之后,可以通过API将数据上报到火山云DataSail。
请确保您已开通了您需要访问的服务。您可前往火山引擎控制台开通全域数据集成服务,详见服务开通。
Access Key(访问密钥)是访问火山引擎服务的安全凭证,包含Access Key ID(简称为AK)和Secret Access Key(简称为SK)两部分。您可登录火山引擎控制台,前往访问控制 的访问密钥 中创建及管理您的Access Key。更多信息可参考访问密钥帮助文档 。
路径:数据采集-topic管理-新建Topic
路径:数据采集-采集管理-新建采集任务
字段 | 说明 | 示例 | 备注 |
---|---|---|---|
AK | 火山引擎Access Key | AKLTZWU***** | |
SK | 火山引擎Secret Key | TW1KaVl****** | |
采集任务ID | 采集任务ID | hkktppvwtuv0xy000 | 获取路径:数据采集->采集管理->采集任务 ID 🔗 |
服务域名 | 上报数据的域名 | datasail01-cn-beijing.volceapplog.com | |
Region | 区域 | cn-north-1 | 固定值 |
Service | 服务 | dataleap | 固定值 |
地域 | 服务域名 |
---|---|
华北2(北京) | datasail01-cn-beijing.volceapplog.com |
当上报数据时,需要根据服务域名及采集任务名称组合出最终URL访问地址,格式如下:
https://{服务域名}/v1/production/general/collect/{采集任务ID}/list
例如,采集任务ID为hkktppvwtuv0xy000
, 所在地域为华北2(北京),则对应访问地址为:
https://datasail01-cn-beijing.volceapplog.com/v1/production/general/collect/hkktppvwtuv0xy000/list
参数名 | 类型 | 是否必填 | 含义 | 取值逻辑 |
---|---|---|---|---|
Authentication | String | 是 | 火山TOP网关鉴权 | 按火山签名方案后的值 |
Content-Type | String | 否 | 上报的数据类型 |
|
签名方案参考火山引擎签名: https://www.volcengine.com/docs/6369/67269,可以使用火山引擎已提供的签名SDK: https://www.volcengine.com/docs/6369/156029
整体流程:
因整体签名过程较繁琐,推荐使用火山引擎已封装的SDK,签名与数据上报Demo见下方"接入Demo"。
上报实际数据
pom依赖配置
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.8</version> </dependency> <dependency> <groupId>com.volcengine</groupId> <artifactId>volc-sdk-java</artifactId> <version>1.0.49</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.10.1</version> </dependency>
Demo
import com.google.gson.Gson; import com.volcengine.auth.ISignerV4; import com.volcengine.auth.impl.SignerV4Impl; import com.volcengine.model.Credentials; import com.volcengine.service.SignableRequest; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.utils.URIBuilder; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.ContentType; import org.apache.http.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.Asserts; import org.apache.http.util.EntityUtils; public class GeneralCollector { private ISignerV4 singer; private Credentials credentials = new Credentials(); private CloseableHttpClient httpClient; private String url; public GeneralCollector(String ak, String sk, String domain, String taskKey) { singer = new SignerV4Impl(); RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(1000) .setSocketTimeout(3000).setConnectTimeout(1000).build(); httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); credentials.setAccessKeyID(ak); credentials.setSecretAccessKey(sk); credentials.setRegion("cn-north-1"); credentials.setService("dataleap"); url = "https://" + domain + "/v1/production/general/collect/" + taskKey + "/list/"; } // 上报json list public void batchSendByteData(byte[] data) throws Exception { if (hasBigSize(data)) { throw new Exception("data size is too big"); } send(data, ContentType.APPLICATION_JSON, "batch_json"); } // 上报单条bytes数据 public void sendByteData(byte[] data) throws Exception { if (hasBigSize(data)) { throw new Exception("data size is too big"); } send(data, ContentType.APPLICATION_OCTET_STREAM, ""); } private void send(byte[] data, ContentType contentType, String collectContentType) throws Exception { SignableRequest signableRequest; signableRequest = new SignableRequest(); signableRequest.setMethod("POST"); URIBuilder uriBuilder = new URIBuilder(url); signableRequest.setHeader("Content-Type", contentType.getMimeType()); if (collectContentType != null && !collectContentType.equals("")) { signableRequest.setHeader("X-Collect-Content-Type", collectContentType); } signableRequest.setEntity(new ByteArrayEntity(data, contentType)); signableRequest.setUriBuilder(uriBuilder); signableRequest.setURI(uriBuilder.build()); singer.sign(signableRequest, credentials); Asserts.notNull(signableRequest, "signableRequest can not null"); HttpResponse response = httpClient.execute(signableRequest); if (response.getStatusLine().getStatusCode() != 200) { throw new Exception("http status code error"); } HttpEntity respEntity = response.getEntity(); byte[] content = EntityUtils.toByteArray(respEntity); Gson g = new Gson(); RespInfo respInfo = g.fromJson(new String(content), RespInfo.class); if (respInfo.e != null && respInfo.e != 0) { throw new Exception("response code error, code: " + respInfo.e + " msg: " + respInfo.m); } } private boolean hasBigSize(byte[] data) { // 判断size,不能超过4MB if (data.length > 4000000) { return true; } return false; } public static void Test() throws Exception { GeneralCollector collector = new GeneralCollector("AKLTYzlhOGU5YjZjM2U4NGJjYzgzNzFiYmU5N**********", "TVdFMlptTmhNVE15TnpjMU5ESTFNR0ZsWlRZM01HRmxObVkx**********==", "datasail01-cn-beijing.volceapplog.com", "w13311338567c9afe" ); collector.sendByteData("abc".getBytes()); collector.batchSendByteData("[{\"a\":1},{\"b\":\"2\"}]".getBytes()); } static class RespInfo { public Integer e; public String m; } }
签名使用: github.com/volcengine/volc-sdk-golang
package main import ( "bytes" "encoding/json" "fmt" "io/ioutil" "net/http" "time" "github.com/volcengine/volc-sdk-golang/base" ) type RespInfo struct { E *int `json:"e"` M string `json:"m"` } type GeneralCollector struct { credentials base.Credentials url string } func NewGeneralCollector(ak, sk, domain, taskKey string) *GeneralCollector { return &GeneralCollector{ credentials: base.Credentials{ AccessKeyID: ak, SecretAccessKey: sk, Service: "dataleap", Region: "cn-north-1", }, url: fmt.Sprintf("https://%s/v1/production/general/collect/%s/list/", domain, taskKey), } } func (g *GeneralCollector) SendByteData(data []byte) error { payload := bytes.NewReader(data) req, _ := http.NewRequest("POST", g.url, payload) signRequest := g.credentials.Sign(req) // 签名 httpClient := http.Client{Timeout: time.Second * 5} httpResponse, err := httpClient.Do(signRequest) if err != nil { return fmt.Errorf("failed to send data : %v", err) } defer func() { if httpResponse != nil && httpResponse.Body != nil { httpResponse.Body.Close() } }() if httpResponse.StatusCode != http.StatusOK { return fmt.Errorf("failed to send data (status code: %d)", httpResponse.StatusCode) } response, err := ioutil.ReadAll(httpResponse.Body) if err != nil { return fmt.Errorf("failed to read response: %v", err) } respInfo := RespInfo{} if err := json.Unmarshal(response, &respInfo); err != nil { return fmt.Errorf("failed to parse response: %v", err) } if respInfo.E == nil { return fmt.Errorf("get response error (no status code)") } if *respInfo.E != 0 { return fmt.Errorf("get response error (status code: %d, status msg: %s)", *respInfo.E, respInfo.M) } return nil } func main() { // 配置ak、sk、域名及taskKey collector := NewGeneralCollector("AKLTYzlhOGU5YjZjM2U4NGJjYzgzNzFiYmU5N**********", "TVdFMlptTmhNVE15TnpjMU5ESTFNR0ZsWlRZM01HRmxObVkx**********==", "datasail01-cn-beijing.volceapplog.com", "w13311338567c9afe", ) if err := collector.SendByteData([]byte("Hello, world!")); err != nil { panic(err) } }
参数名 | 含义 | 详细信息 |
---|---|---|
e | 状态码 |
|
m | 如失败,显示错误信息 |
POST /v1/production/general/collect/w13311338567c9afe/list HTTP/1.1 Host: datasail01-cn-beijing.volceapplog.com Authorization: HMAC-SHA256 Credential=AKLTYzlhOGU5YjZjM2U4NGJjYzgzNzFiYmU5NjY4ZDQ1MDU/20230804/cn-north-1/dataleap/request, SignedHeaders=content-type;host;x-content-sha256;x-date, Signature=a2e214ea7d9492bb35f75f48eaf820570083194b4dd29942b6fdacdd278d2ebb Content-Type: application/x-www-form-urlencoded; charset=utf-8 X-Content-Sha256: 315f5bdb76d078c43b8ac0064e4a0164612b1fce77c869345bfc94c75894edd3 X-Date: 20230804T101113Z Hello, world!
HTTP/1.1 200 OK Content-Length: 21 Access-Control-Allow-Credentials: true Access-Control-Allow-Methods: GET, OPTIONS, HEAD, PUT, POST Access-Control-Max-Age: 1800 Connection: keep-alive Content-Type: application/json; charset=utf-8 Date: Fri, 04 Aug 2023 10:11:13 GMT Server: volcclb Vary: Accept-Encoding X-Trace-Id: 20230804101112000fe63e5682ecb48b {"e":0,"m":"success"}