表格数据库 HBase 版默认提供了 ZK 连接地址,同时也支持 Thrift 多语言访问,Thrift 是 HBase 标准版实例中的一种服务组件,基于 Apache Thrift(多语言支持的通信框架)开发。本文介绍基于 Python 程序通过 Thrift2 地址访问 HBase 实例的操作步骤。
python version
命令检查当前 Python 的版本。说明
表格数据库 HBase 版默认未开通 Thrift2 地址,您需要先申请 Thrift2 连接地址,申请方法,请参见申请 Thrift2 连接地址。
下载已编译好的 Thrift 文件(文件中包含了可供 Python 访问的 HBase Thrift 模块),并将其保存在已安装了 Thrift 服务的 ECS 实例或本地设备上。
# 解压下载的 Thrift 文件 tar -zxvf gen-py.tar.gz # 进入 gen-py 目录 cd gen-py
配置 Python 环境变量,来引用 HBase Thrift 模块。将上述 gen-py
目录下的 hbase
中的文件移动到 Python 的安装目录中。
命令如下。
mv hbase/ /<ECS 实例或本地设备上 Python 的安装目录>/Python/3.8/lib/python/site-packages/
安装 HBase Thrift 模块。
命令如下。
pip install thrift
# encoding:utf-8 # !/usr/bin/env python3 import random # 通过 TTransport、TSocket 和 TBinaryProtocol 开启一个 Thrift2 连接。 from thrift.transport import TTransport from thrift.transport import TSocket from thrift.protocol import TBinaryProtocol # 来自thrift --gen py hbase.thrift from hbase import THBaseService from hbase.ttypes import TPut, TColumnValue, TGet, TNamespaceDescriptor, TTableDescriptor, TColumnFamilyDescriptor, \ TTableName, TScan class Demo: def __init__(self, host, port): socket = TSocket.TSocket(host=host, port=port) self.transport = TTransport.TFramedTransport(socket) protocol = TBinaryProtocol.TBinaryProtocol(self.transport) self.client = THBaseService.Client(protocol) self.transport.open() def __del__(self): self.transport.close() def get_ns_list(self): return [ns.name for ns in self.client.listNamespaceDescriptors()] def namespace_exists(self, namespace): return namespace in self.get_ns_list() def create_ns(self, namespace): if not self.namespace_exists(namespace): self.client.createNamespace(TNamespaceDescriptor(name=namespace)) def create_table(self, table_name, column_family, namespace="ns"): if not self.namespace_exists(namespace): self.client.createNamespace(TNamespaceDescriptor(name=namespace)) if not self.table_exists(namespace, table_name): _table_name = TTableName( ns=namespace.encode("utf8"), qualifier=table_name.encode("utf8") ) self.client.createTable( TTableDescriptor( tableName=_table_name, columns=[ TColumnFamilyDescriptor(name=column_family.encode("utf8")) ], ), None, ) def table_exists(self, namespace, table_name): namespace = namespace.encode("utf8") table_name = table_name.encode("utf8") _table_name = TTableName(ns=namespace, qualifier=table_name) return self.client.tableExists(_table_name) def put(self, namespace, table_name, row_key, family, qualifier, value): row_key, family = row_key.encode("utf8"), family.encode("utf8") qualifier, value = qualifier.encode("utf8"), value.encode("utf8") table_in_bytes = self._table_in_bytes(namespace, table_name) put = TPut( row=row_key, columnValues=[ TColumnValue(family=family, qualifier=qualifier, value=value) ], ) self.client.put(table_in_bytes, put) def get(self, namespace, table_name, row_key): row_key = row_key.encode("utf8") table_in_bytes = self._table_in_bytes(namespace, table_name) get = TGet(row=row_key) result = self.client.get(table_in_bytes, get) return result def scan(self, namespace, table_name, start_row, stop_row): start_row, stop_row = start_row.encode("utf8"), stop_row.encode("utf8") table_in_bytes = self._table_in_bytes(namespace, table_name) scan = TScan(startRow=start_row, stopRow=stop_row) caching = 2 results = [] while True: last_result = None current_results = self.client.getScannerResults( table_in_bytes, scan, caching ) for result in current_results: results.append(result) last_result = result if last_result is None: break else: next_start_row = self._create_closest_row_after(last_result.row) scan = TScan(startRow=next_start_row, stopRow=stop_row) return results def batch_put(self, namespace, table_name): """ puts = [TPut(row="row2".encode("utf8"), columnValues=[TColumnValue(family="f".encode("utf8"), qualifier="q1".encode("utf8"), value="value2".encode("utf8"))]), ...] """ puts = [TPut(row="row1".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="value1".encode("utf8"))]), TPut(row="row2".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q2".encode("utf8"), value="value2".encode("utf8"))])] table_in_bytes = self._table_in_bytes(namespace, table_name) self.client.putMultiple(table=table_in_bytes, tputs=puts) def batch_get(self, namespace, table_name): """ gets = [TGet(row="row2".encode("utf8")), ...] """ gets = [TGet(row="row1".encode("utf8")), TGet(row="row2".encode("utf8"))] table_in_bytes = self._table_in_bytes(namespace, table_name) self.client.getMultiple(table_in_bytes, gets) def get_table_descriptor(self, ns, table): """ res struct: { "cf1": {"attr1": value1, "attr2": value2, ....}, "cf2": {"attr1": value1, "attr2": value2, ....}, ..... } """ table = table.encode("utf8") ns = ns.encode("utf8") table = TTableName(ns=ns, qualifier=table) info = self.client.getTableDescriptor(table) res = {} if info and info.columns: for item in info.columns: attributes = item.attributes new_attr = {} for key, value in attributes.items(): new_attr[self._bytes_to_str(key)] = self._bytes_to_str(value) res[self._bytes_to_str(item.name)] = new_attr table_attr = {} for key, value in info.attributes.items(): table_attr[self._bytes_to_str(key)] = self._bytes_to_str(value) return table_attr, res def delete(self, namespace, table): namespace, table = namespace.encode("utf8"), table.encode("utf8") # disable self.client.disableTable(TTableName(ns=namespace, qualifier=table)) # drop self.client.deleteTable(TTableName(ns=namespace, qualifier=table)) @staticmethod def _bytes_to_str(byte_str): return bytes.decode(byte_str) @staticmethod def _table_in_bytes(namespace, table_name): return "{namespace}:{table_name}".format( namespace=namespace, table_name=table_name ).encode("utf8") @staticmethod def _create_closest_row_after(row): array = bytearray(row) array.append(0x00) return bytes(array) def unit_test(): client = Demo(host="xxx", port=9090) name_space = "ns6" table = "table6" column = "cf1" client.create_table(table, column, name_space) print(client.table_exists(name_space, table)) for i in range(10): row_key = "row" + str(i) client.put(name_space, table, row_key, column, "name", "handsome_boy" + str(i)) # client.batch_put(name_space, table) # scan start_row = "row0" stop_row = "row8" result = client.scan(name_space, table, start_row, stop_row) print(result) result = client.get(name_space, table, "row4") # client.batch_get(name_space, table) # result = client.get_table_descriptor(name_space, table) # client.delete(name_space, table) # result = client.table_exists(name_space, table) print(result) if __name__ == "__main__": unit_test()