本文介绍如何在 PHP 开发环境连接并访问 ByteHouse 云数仓。
细分项 | 已验证版本 |
|---|---|
ClickHouse 驱动程序版本 | 已验证版本:1.6.0,版本详情及下载链接请参见:phpClickhouse v1.6.0 |
PHP 版本 | 已验证版本:PHP 8.4 或更高版本 |
获取 ClickHouse 驱动后,可以通过 composer 进行安装:
composer require smi2/phpclickhouse
ByteHouse 支持通过 IAM 用户或数据库用户连接 ClickHouse PHP Driver。IAM 用户与数据库用户二者差异说明如下,您可按需选择。
更多 IAM 用户和数据库用户的介绍请参见以下文档:
参数 | 配置要点 |
|---|---|
host | 配置为 ByteHouse 的公网/私网连接域名,其中 {TENANT_ID}、{REGION} 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息。 |
port | 固定配置为:8123。 |
database | 配置为连接 ByteHouse 的数据库名称。 |
user & password |
|
https | 固定配置为:true。 |
$db->settings()->set() | 可选,用于设置 ByteHouse 服务器相关参数,例如,配置连接 ByteHouse 后使用的计算组。如果您有多个计算组,希望后续查询 ByteHouse 数据时使用指定计算组,或者希望设置其他指定参数,您可以在 |
参数 | 配置要点 |
|---|---|
host | 配置为 ByteHouse 的公网/私网连接域名,其中 {TENANT_ID}、{REGION} 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制王阔信息。详情请参见步骤二:配置网络信息。 |
port | 固定配置为:8123。 |
database | 配置为连接 ByteHouse 的数据库名称。 |
user & password |
|
https | 固定配置为:true。 |
$db->settings()->set() | 可选,用于设置 ByteHouse 服务器相关参数,例如,配置连接 ByteHouse 后使用的计算组。如果您有多个计算组,希望后续查询 ByteHouse 数据时使用指定计算组,或者希望设置其他指定参数,您可以在 |
连接 ByteHouse 后,您可以通过 ClickHouse PHP Driver 进行建表、数据读写等操作,以下为简单的操作示例。
可参考下面代码连接至 ByteHouse,使用时注意替换连接语句中的 {Host}、{User}、{Password}、{Database}、{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息。
<?php require_once 'vendor/autoload.php'; use Ramsey\Uuid\Uuid; class Example { private $client; private $config; public function __construct($host, $port, $user, $password, $database, $virtualWarehouse = null) { $this->config = [ 'host' => $host, 'port' => $port, 'username' => $user, 'password' => $password, 'database' => $database, 'https' => true ]; $this->initializeClient($virtualWarehouse); } private function initializeClient($virtualWarehouse) { try { $this->client = new ClickHouseDB\Client($this->config); if ($virtualWarehouse) { $this->client->settings()->set('virtual_warehouse', $virtualWarehouse); } $this->client->ping(); $this->client->setTimeout(60); $this->client->setConnectTimeOut(10); } catch (Exception $e) { throw new Exception("初始化ClickHouse客户端失败: " . $e->getMessage()); } } public function __destruct() { unset($this->client); } } try { $host = "{Host}"; $port = 8123; $password = "{Password}"; $user = "{User}"; $database = "{Database}"; $virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"; $example = new Example($host, $port, $user, $password, $database, $virtual_warehouse_id); $example->runExample(); } catch (Exception $e) { echo "程序执行失败: " . $e->getTraceAsString() . "\n"; exit(1); } ?>
您可使用 settings 中的 set 设置 session 级别的 query ID。
注意
请注意这种方式下,需要每一个 SQL 执行之前重新设置 query ID。如果只执行一次 query ID,则所有的 SQL 都将会是重复的 query ID。
$this->client->settings()->set('query_id', 'customized_' . Uuid::uuid4()->toString());
<?php require_once 'vendor/autoload.php'; use Ramsey\Uuid\Uuid; class Example { private $client; private $config; public function __construct($host, $port, $user, $password, $database, $virtualWarehouse = null) { $this->config = [ 'host' => $host, 'port' => $port, 'username' => $user, 'password' => $password, 'database' => $database, 'https' => true ]; $this->initializeClient($virtualWarehouse); } private function initializeClient($virtualWarehouse) { try { $this->client = new ClickHouseDB\Client($this->config); if ($virtualWarehouse) { $this->client->settings()->set('virtual_warehouse', $virtualWarehouse); } $this->client->ping(); $this->client->setTimeout(60); $this->client->setConnectTimeOut(10); } catch (Exception $e) { throw new Exception("初始化ClickHouse客户端失败: " . $e->getMessage()); } } public function runExample() { try { $this->cleanupDatabase(); $this->createTable(); $this->insertData(); $this->queryData(); $this->cleanupTable(); } catch (Exception $e) { throw $e; } } private function cleanupDatabase() { $queryId = 'customized_' . Uuid::uuid4()->toString(); $this->client->write("DROP DATABASE IF EXISTS bhpythontest"); $this->client->write("CREATE DATABASE IF NOT EXISTS bhpythontest"); $this->client->database('bhpythontest'); echo "数据库清理和创建完成\n"; } private function createTable() { $sql = " CREATE TABLE IF NOT EXISTS bhpythontest.example ( Col1 UInt8, Col2 String, Col3 FixedString(3), Col4 UUID, Col5 Map(String, UInt8), Col6 Array(String), Col7 Tuple(String, UInt8, Array(Map(String, String))) KV, Col8 DateTime ) ENGINE = CnchMergeTree() ORDER BY tuple() "; $this->client->write($sql); echo "表创建成功\n"; } private function insertData() { $start = microtime(true); $values = []; for ($i = 0; $i < 100; $i++) { $values[] = sprintf( "(42, 'ClickHouse', 'Inc', '%s', map('key', 1, 'key2', %d), ['Q','W','E','R','T','Y'], ('String Value', 5, [map('key','value')]), now())", Uuid::uuid4()->toString(), $i + 1 ); } $insertSQL = "INSERT INTO bhpythontest.example VALUES " . implode(', ', $values); $this->client->write($insertSQL); $time = microtime(true) - $start; echo sprintf("插入记录成功,耗时: %.2f 秒\n", $time); } private function queryData() { $result = $this->client->select("SELECT * FROM example LIMIT 5"); if ($result->count() > 0) { echo "前5条记录:\n"; echo str_repeat("-", 80) . "\n"; foreach ($result->rows() as $index => $row) { echo sprintf("记录 %d:\n", $index + 1); echo sprintf(" Col1: %d\n", $row['Col1']); echo sprintf(" Col2: %s\n", $row['Col2']); echo sprintf(" Col3: %s\n", $row['Col3']); echo sprintf(" Col4: %s\n", $row['Col4']); echo sprintf(" Col5: %s\n", json_encode($row['Col5'])); echo sprintf(" Col6: %s\n", json_encode($row['Col6'])); echo sprintf(" Col7: %s\n", json_encode($row['Col7'])); echo sprintf(" Col8: %s\n", $row['Col8']); echo "\n"; } } } private function cleanupTable() { $this->client->write("DROP TABLE IF EXISTS example"); echo "表清理完成\n"; } public function __destruct() { unset($this->client); } } try { $host = "{Host}"; $port = 8123; $password = "{Password}"; $user = "{User}"; $database = "{Database}"; $virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"; $example = new Example($host, $port, $user, $password, $database, $virtual_warehouse_id); $example->runExample(); } catch (Exception $e) { echo "程序执行失败: " . $e->getTraceAsString() . "\n"; exit(1); } ?>
您可以使用insertBatchFiles()来并行插入多个 CSV 文件,可以有效地将数据批量加载到 ByteHouse。
<?php require_once 'vendor/autoload.php'; use Ramsey\Uuid\Uuid; function generateDemoCSV($filename, $numRows): void { $fp = fopen($filename, 'w'); for ($i = 0; $i < $numRows; $i++) { fputcsv($fp, [ date('Y-m-d'), // event_date rand(1000, 9999), // user_id 'Message ' . $i, // message rand(1, 100) // value ], ',', '"', "\\", "\n"); } fclose($fp); echo "Generated $numRows rows in $filename\n"; } class Example { private $client; private $config; public function __construct($host, $port, $user, $password, $database, $virtualWarehouse = null) { $this->config = [ 'host' => $host, 'port' => $port, 'username' => $user, 'password' => $password, 'database' => $database, 'https' => true ]; $this->initializeClient($virtualWarehouse); } private function initializeClient($virtualWarehouse) { try { $this->client = new ClickHouseDB\Client($this->config); if ($virtualWarehouse) { $this->client->settings()->set('virtual_warehouse', $virtualWarehouse); } $this->client->ping(); $this->client->setTimeout(60); $this->client->setConnectTimeOut(10); } catch (Exception $e) { throw new Exception("初始化ClickHouse客户端失败: " . $e->getMessage()); } } public function runExample() { try { $this->cleanupDatabase(); $this->insertIntoFromCSV(); } catch (Exception $e) { throw $e; } } private function insertIntoFromCSV() { try { $this->client->write('DROP TABLE IF EXISTS demo_table'); $this->client->write(' CREATE TABLE demo_table ( event_date Date, user_id UInt32, message String, value UInt32 ) ENGINE = CnchMergeTree ORDER BY tuple() '); // Generate CSV files $files = [ __DIR__ . '/demo_data_1.csv' => 1000, __DIR__ . '/demo_data_2.csv' => 1000 ]; foreach ($files as $file => $rows) { generateDemoCSV($file, $rows); } // Insert data via Batch Operation $result = $this->client->insertBatchFiles( 'demo_table', array_keys($files), ['event_date', 'user_id', 'message', 'value'] ); $totalCount = $this->client->select('SELECT count() as count FROM demo_table')->fetchOne('count'); echo "Total rows: $totalCount\n"; } catch (\Exception $e) { echo "Error: " . $e->getMessage() . "\n"; } } private function cleanupDatabase() { $this->client->write("DROP DATABASE IF EXISTS bhpythontest"); $this->client->write("CREATE DATABASE IF NOT EXISTS bhpythontest"); $this->client->database('bhpythontest'); echo "数据库清理和创建完成\n"; } public function __destruct() { unset($this->client); } } try { $host = "{Host}"; $port = 8123; $password = "{Password}"; $user = "{User}"; $database = "{Database}"; $virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"; $example = new Example($host, $port, $user, $password, $database, $virtual_warehouse_id); $example->runExample(); } catch (Exception $e) { echo "程序执行失败: " . $e->getTraceAsString() . "\n"; exit(1); } ?>
当前支持通过':param'语法、'{param}'语法,使用占位符安全地将参数值传递给查询,示例如下。
<?php require_once 'vendor/autoload.php'; use Ramsey\Uuid\Uuid; function generateDemoCSV($filename, $numRows): void { $fp = fopen($filename, 'w'); for ($i = 0; $i < $numRows; $i++) { fputcsv($fp, [ date('Y-m-d'), // event_date rand(1000, 9999), // user_id 'Message ' . $i, // message rand(1, 100) // value ], ',', '"', "\\", "\n"); } fclose($fp); echo "Generated $numRows rows in $filename\n"; } class Example { private $client; private $config; public function __construct($host, $port, $user, $password, $database, $virtualWarehouse = null) { $this->config = [ 'host' => $host, 'port' => $port, 'username' => $user, 'password' => $password, 'database' => $database, 'https' => true ]; $this->initializeClient($virtualWarehouse); } private function initializeClient($virtualWarehouse) { try { $this->client = new ClickHouseDB\Client($this->config); if ($virtualWarehouse) { $this->client->settings()->set('virtual_warehouse', $virtualWarehouse); } $this->client->ping(); $this->client->setTimeout(60); $this->client->setConnectTimeOut(10); } catch (Exception $e) { throw new Exception("初始化ClickHouse客户端失败: " . $e->getMessage()); } } public function runExample() { try { $this->cleanupDatabase(); $this->bindParamters(); } catch (Exception $e) { throw $e; } } private function bindParamters() { try { // Create a sample table $this->client->write('DROP TABLE IF EXISTS user_events'); $this->client->write(' CREATE TABLE user_events ( event_date Date, event_type String, user_id UInt32, score Float32, tags Array(String) ) ENGINE = CnchMergeTree ORDER BY tuple() '); // Insert sample data $this->client->insert('user_events', [ [ '2024-01-01', 'login', 101, 0.5, ['new', 'mobile'] ], [ '2024-01-01', 'purchase', 102, 0.8, ['vip', 'desktop'] ], [ '2024-01-02', 'login', 101, 0.3, ['mobile'] ] ], [ 'event_date', 'event_type', 'user_id', 'score', 'tags' ]); // 示例 1: 基础参数绑定 $result = $this->client->select( 'SELECT * FROM user_events WHERE user_id = :id', ['id' => 101] ); echo "Events for user 101:\n"; print_r($result->rows()); // 示例 2: 多个参数绑定 $result = $this->client->select( 'SELECT * FROM user_events WHERE event_date = :date AND event_type = :type', [ 'date' => '2024-01-01', 'type' => 'login' ] ); echo "\nLogin events on 2024-01-01:\n"; print_r($result->rows()); // 示例 3: 绑定 Array $result = $this->client->select( 'SELECT * FROM user_events WHERE user_id IN (:users) AND score >= :min_score', [ 'users' => [101, 102], 'min_score' => 0.5 ] ); echo "\nHigh-scoring events for selected users:\n"; print_r($result->rows()); // 示例 4:为列查询绑定模板(template) $columns = 'event_date, event_type, user_id'; $result = $this->client->select( 'SELECT {cols} FROM user_events WHERE score > :score', [ 'cols' => $columns, 'score' => 0.4 ] ); echo "\nSelected columns for high scores:\n"; print_r($result->rows()); } catch (\Exception $e) { echo "Error: " . $e->getMessage() . "\n"; } } private function cleanupDatabase() { $this->client->write("DROP DATABASE IF EXISTS bhpythontest"); $this->client->write("CREATE DATABASE IF NOT EXISTS bhpythontest"); $this->client->database('bhpythontest'); echo "数据库清理和创建完成\n"; } public function __destruct() { unset($this->client); } } try { $host = "{Host}"; $port = 8123; $password = "{Password}"; $user = "{User}"; $database = "{Database}"; $virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"; $example = new Example($host, $port, $user, $password, $database, $virtual_warehouse_id); $example->runExample(); } catch (Exception $e) { echo "程序执行失败: " . $e->getTraceAsString() . "\n"; exit(1); } ?>