You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

90 lines
3.1 KiB
Python

4 weeks ago
# -*- coding: utf-8 -*-
# @date2024/8/12 11:15
# @AuthorLiuYiJie
# @file phoenix_insert_article
import jaydebeapi
from log_moudle import logger
from queue import Queue
class PhoenixConnectionPool:
def __init__(self, size, driver_driver=None, jdbc_url=None, driver_jars=None, driver_config=None):
self.size = size
self.queue = Queue(maxsize=size)
for _ in range(size):
conn = jaydebeapi.connect(driver_driver, jdbc_url, driver_config, driver_jars)
self.queue.put(conn)
def get_connection(self):
return self.queue.get()
def release_connection(self, conn):
self.queue.put(conn)
def close_all(self):
logger.info('执行结束关闭所有phoenix连接')
while not self.queue.empty():
conn = self.queue.get()
conn.close()
class phoenixServer:
def __init__(self):
# JDBC 驱动路径
# 注意驱动路径中不能包含中文
phoenix_jars = [
r'G:\big_data\software\phoenix\phoenix-client-embedded-hbase-2.5-5.2.0.jar',
r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-1.2-api-2.18.0.jar',
r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-api-2.18.0.jar',
r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-core-2.18.0.jar',
r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-slf4j-impl-2.18.0.jar',
]
# Phoenix JDBC URL
jdbc_url = 'jdbc:phoenix:hadoop01:2181:hbase-unsecure;characterEncoding=UTF-8'
driver_config = {'phoenix.schema.isNamespaceMappingEnabled': 'true'}
driver_driver = 'org.apache.phoenix.jdbc.PhoenixDriver'
# 建立连接
# 创建连接池
self.pool = PhoenixConnectionPool(size=2, driver_driver=driver_driver, jdbc_url=jdbc_url,
driver_jars=phoenix_jars, driver_config=driver_config)
# self.conn = jaydebeapi.connect(driver_class, jdbc_url, driver_config, phoenix_jar)
def find(self, sql):
# 创建游标
data_lists = []
conn = self.pool.get_connection()
cursor = conn.cursor()
try:
# 执行查询
cursor.execute(sql)
# 获取结果
results = cursor.fetchall()
for rep in results:
data_lists.append(rep)
finally:
cursor.close()
self.pool.release_connection(conn)
logger.info(data_lists)
return data_lists
def upsert(self, sql, data: list):
conn = self.pool.get_connection()
cursor = conn.cursor()
try:
cursor.executemany(sql, data)
conn.commit()
except:
conn.rollback()
logger.warn('数据插入错误')
finally:
self.pool.release_connection(conn)
cursor.close()
# if __name__ == '__main__':
# c = phoenixServer()
# sql = "select * from SCIENCE.SCIENCE_ARTICLE_METADATA"
# c.find(sql)
# c.pool.close_all()