CREATE DATABASE if not EXISTS online_database; USE online_database; CREATE TABLE log_v1 ( uid int, name varchar(64), age int, phone varchar(16), last_login datetime, credits double ) DUPLICATE KEY(uid, name); CREATE TABLE log_v2 ( uid int, name varchar(64), age int, phone varchar(16), last_login datetime, credits double ) DUPLICATE KEY(uid, name);
说明
默认的数据目录为hive,若使用该数据目录,请为其设置默认的TOS存储位置。
CREATE DATABASE IF NOT EXISTS ss_w2r_sr;
CREATE TABLE if not exists ss_w2r_sr.log_v1( `uid` int, `name` string, `age` int, `phone` string, `last_login` TIMESTAMP, `credits` double) PARTITIONED BY ( `date` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE location 'tos://{Database TOS storage address}/log_v1'; CREATE TABLE if not exists ss_w2r_sr.log_v2( `uid` int, `name` string, `age` int, `phone` string, `last_login` TIMESTAMP, `credits` double) PARTITIONED BY ( `date` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE location 'tos://{Database TOS storage address}/log_v2';
登录EMR Serverless Spark作业编辑页面,输入下列SQL代码块中示例的Spark SQL语句,点击作业编辑框右下角的“运行”按钮,执行SQL作业。
-- starrocks fe url set spark.sql.catalog.starrocks.fe.http.url=http://{local ip address}:8030; set spark.sql.catalog.starrocks.fe.jdbc.url=jdbc:mysql://{local ip address}:9030; -- user name with admin right in starrocks instance set spark.sql.catalog.starrocks.user={user name}; -- user password set spark.sql.catalog.starrocks.password={user password}; -- starrocks instance TOS endpoint, such as https://tos-s3-cn-beijing.ivolces.com set spark.sql.catalog.starrocks.fs.s3a.endpoint={endpoint}; -- starrocks instance TOS region, such as cn-beijing set spark.sql.catalog.starrocks.fs.s3a.endpoint.region={region name}; set spark.sql.catalog.starrocks.fs.s3a.access.key={access key}; set spark.sql.catalog.starrocks.fs.s3a.secret.key={secret key}; set tqs.query.engine.type = sparkcli; set las.cross.vpc.access.enabled = true; set las.cross.vpc.vpc.id = {vpc id}; set las.cross.vpc.subnet.id = {subnet id}; set las.cross.vpc.security.group.id = {security group id}; set las.cross.vpc.accountId = {volc account id}; set spark.sql.catalog.starrocks.batch.size=8092; set spark.sql.catalog.starrocks.writer.mode=BYPASS; set spark.sql.catalog.starrocks.reader.mode=BYPASS; set spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled=true; set spark.sql.catalog.starrocks.fs.s3a.path.style.access=false; set spark.sql.catalog.starrocks.fs.s3a.retry.limit=27; set spark.sql.catalog.starrocks=com.starrocks.connector.spark.catalog.StarRocksCatalog; set spark.sql.storeAssignmentPolicy = ansi; -- depend jar address, please refer to "Dependency JAR addresses" for jar addresses of online regions set las.spark.jar.depend.jars = [{"fileName":"{dependency jar address}"}]; set spark.shuffle.manager = sort; set spark.dynamicAllocation.enabled = false; set spark.sql.serverless.lf.auth.enabled = false; set spark.sql.extensions=com.starrocks.connector.spark.StarRocksExtensions; use starrocks; insert into table online_database.log_v2 select * from online_database.log_v1; select * from online_database.log_v2;
登录EMR Serverless Spark作业编辑页面,输入下列SQL代码块中示例的Spark SQL语句,点击作业编辑框右下角的“运行”按钮,执行SQL作业。
-- starrocks fe url set spark.sql.catalog.starrocks.fe.http.url=http://{local ip address}:8030; set spark.sql.catalog.starrocks.fe.jdbc.url=jdbc:mysql://{local ip address}:9030; -- user name with admin right in starrocks instance set spark.sql.catalog.starrocks.user={user name}; -- user password set spark.sql.catalog.starrocks.password={user password}; -- starrocks instance TOS endpoint, such as https://tos-s3-cn-beijing.ivolces.com set spark.sql.catalog.starrocks.fs.s3a.endpoint={endpoint}; -- starrocks instance TOS region, such as cn-beijing set spark.sql.catalog.starrocks.fs.s3a.endpoint.region={region name}; set spark.sql.catalog.starrocks.fs.s3a.access.key={access key}; set spark.sql.catalog.starrocks.fs.s3a.secret.key={secret key}; set tqs.query.engine.type = sparkcli; set las.cross.vpc.access.enabled = true; set las.cross.vpc.vpc.id = {vpc id}; set las.cross.vpc.subnet.id = {subnet id}; set las.cross.vpc.security.group.id = {security group id}; set las.cross.vpc.accountId = {volc account id}; set spark.sql.catalog.starrocks.batch.size=8092; set spark.sql.catalog.starrocks.writer.mode=BYPASS; set spark.sql.catalog.starrocks.reader.mode=BYPASS; set spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled=true; set spark.sql.catalog.starrocks.fs.s3a.path.style.access=false; set spark.sql.catalog.starrocks.fs.s3a.retry.limit=27; set spark.sql.catalog.starrocks=com.starrocks.connector.spark.catalog.StarRocksCatalog; set spark.sql.storeAssignmentPolicy = ansi; -- depend jar address, please refer to "Dependency JAR addresses" for jar addresses of online regions set las.spark.jar.depend.jars = [{"fileName":"{dependency jar address}"}]; set spark.shuffle.manager = sort; set spark.dynamicAllocation.enabled = false; set spark.sql.serverless.lf.auth.enabled = false; set spark.sql.extensions=com.starrocks.connector.spark.StarRocksExtensions; use starrocks; WITH temp AS ( select uid, name, age, phone, last_login, credits from online_database.log_v1 ) insert overwrite table spark_catalog.ss_w2r_sr.log_v1 partition(date) select uid + cast(rand() * 900 + 10 as int), CONCAT(name,'_1'), age, phone, last_login, credits, '20240601' from temp; select * from spark_catalog.ss_w2r_sr.log_v1;
登录EMR Serverless Spark作业编辑页面,输入下列SQL代码块中示例的Spark SQL语句,点击作业编辑框右下角的“运行”按钮,执行SQL作业。
-- starrocks fe url set spark.sql.catalog.starrocks.fe.http.url=http://{local ip address}:8030; set spark.sql.catalog.starrocks.fe.jdbc.url=jdbc:mysql://{local ip address}:9030; -- user name with admin right in starrocks instance set spark.sql.catalog.starrocks.user={user name}; -- user password set spark.sql.catalog.starrocks.password={user password}; -- starrocks instance TOS endpoint, such as https://tos-s3-cn-beijing.ivolces.com set spark.sql.catalog.starrocks.fs.s3a.endpoint={endpoint}; -- starrocks instance TOS region, such as cn-beijing set spark.sql.catalog.starrocks.fs.s3a.endpoint.region={region name}; set spark.sql.catalog.starrocks.fs.s3a.access.key={access key}; set spark.sql.catalog.starrocks.fs.s3a.secret.key={secret key}; set tqs.query.engine.type = sparkcli; set las.cross.vpc.access.enabled = true; set las.cross.vpc.vpc.id = {vpc id}; set las.cross.vpc.subnet.id = {subnet id}; set las.cross.vpc.security.group.id = {security group id}; set las.cross.vpc.accountId = {volc account id}; set tqs.query.engine.type = sparkcli; set las.cross.vpc.access.enabled = true; set las.cross.vpc.vpc.id = {vpc id}; set las.cross.vpc.subnet.id = {subnet id}; set las.cross.vpc.security.group.id = {security group id}; set las.cross.vpc.accountId = {volc account id}; set spark.sql.catalog.starrocks.batch.size=8092; set spark.sql.catalog.starrocks.writer.mode=BYPASS; set spark.sql.catalog.starrocks.reader.mode=BYPASS; set spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled=true; set spark.sql.catalog.starrocks.fs.s3a.path.style.access=false; set spark.sql.catalog.starrocks.fs.s3a.retry.limit=27; set spark.sql.catalog.starrocks=com.starrocks.connector.spark.catalog.StarRocksCatalog; set spark.sql.storeAssignmentPolicy = ansi; -- depend jar address, please refer to "Dependency JAR addresses" for jar addresses of online regions set las.spark.jar.depend.jars = [{"fileName":"{dependency jar address}"}]; set spark.shuffle.manager = sort; set spark.dynamicAllocation.enabled = false; set spark.sql.serverless.lf.auth.enabled = false; set spark.sql.extensions=com.starrocks.connector.spark.StarRocksExtensions; use starrocks; WITH temp AS ( select uid, name, age, phone, last_login, credits from spark_catalog.ss_w2r_sr.log_v1 ) insert into table online_database.log_v1 select * from temp; select * from online_database.log_v1;
Region | JAR地址 |
---|---|
华北-北京 | tos://emr-serverless-spark/spark/shared-lib/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar |
华东-上海 | tos://emr-serverless-spark-shanghai/spark/shared-lib/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar |
华南-广州 | tos://emr-serverless-spark-guangzhou/spark/shared-lib/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar |
东南亚-柔佛 | tos://emr-serverless-spark-ap-southeast-1/spark/shared-lib/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar |