From 71e511c5c763dc18457940cf043a194b61384e23 Mon Sep 17 00:00:00 2001 From: workwindows Date: Wed, 21 Aug 2024 16:00:13 +0800 Subject: [PATCH] =?UTF-8?q?phoenix=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hbase_server/phoenix_operate.py | 89 +++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 hbase_server/phoenix_operate.py diff --git a/hbase_server/phoenix_operate.py b/hbase_server/phoenix_operate.py new file mode 100644 index 0000000..49f56b3 --- /dev/null +++ b/hbase_server/phoenix_operate.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# @date:2024/8/12 11:15 +# @Author:LiuYiJie +# @file: phoenix_insert_article +import jaydebeapi +from log_moudle import logger +from queue import Queue + + +class PhoenixConnectionPool: + def __init__(self, size, driver_driver=None, jdbc_url=None, driver_jars=None, driver_config=None): + self.size = size + self.queue = Queue(maxsize=size) + for _ in range(size): + conn = jaydebeapi.connect(driver_driver, jdbc_url, driver_config, driver_jars) + self.queue.put(conn) + + def get_connection(self): + return self.queue.get() + + def release_connection(self, conn): + self.queue.put(conn) + + def close_all(self): + logger.info('执行结束,关闭所有phoenix连接') + while not self.queue.empty(): + conn = self.queue.get() + conn.close() + + +class phoenixServer: + def __init__(self): + # JDBC 驱动路径 + # 注意驱动路径中不能包含中文 + phoenix_jars = [ + r'G:\big_data\software\phoenix\phoenix-client-embedded-hbase-2.5-5.2.0.jar', + r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-1.2-api-2.18.0.jar', + r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-api-2.18.0.jar', + r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-core-2.18.0.jar', + r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-slf4j-impl-2.18.0.jar', + ] + # Phoenix JDBC URL + jdbc_url = 'jdbc:phoenix:hadoop01:2181:hbase-unsecure;characterEncoding=UTF-8' + driver_config = {'phoenix.schema.isNamespaceMappingEnabled': 'true'} + driver_driver = 'org.apache.phoenix.jdbc.PhoenixDriver' + # 建立连接 + # 创建连接池 + self.pool = PhoenixConnectionPool(size=2, driver_driver=driver_driver, jdbc_url=jdbc_url, + driver_jars=phoenix_jars, driver_config=driver_config) + + # self.conn = jaydebeapi.connect(driver_class, jdbc_url, driver_config, phoenix_jar) + + def find(self, sql): + # 创建游标 + data_lists = [] + conn = self.pool.get_connection() + cursor = conn.cursor() + try: + # 执行查询 + cursor.execute(sql) + # 获取结果 + results = cursor.fetchall() + for rep in results: + data_lists.append(rep) + finally: + cursor.close() + self.pool.release_connection(conn) + logger.info(data_lists) + return data_lists + + def upsert(self, sql, data: list): + conn = self.pool.get_connection() + cursor = conn.cursor() + try: + cursor.executemany(sql, data) + conn.commit() + except: + conn.rollback() + logger.warn('数据插入错误') + finally: + self.pool.release_connection(conn) + cursor.close() + + +# if __name__ == '__main__': +# c = phoenixServer() +# sql = "select * from SCIENCE.SCIENCE_ARTICLE_METADATA" +# c.find(sql) +# c.pool.close_all()