You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

90 lines
3.1 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# @date2024/8/12 11:15
# @AuthorLiuYiJie
# @file phoenix_insert_article
import jaydebeapi
from log_moudle import logger
from queue import Queue
class PhoenixConnectionPool:
def __init__(self, size, driver_driver=None, jdbc_url=None, driver_jars=None, driver_config=None):
self.size = size
self.queue = Queue(maxsize=size)
for _ in range(size):
conn = jaydebeapi.connect(driver_driver, jdbc_url, driver_config, driver_jars)
self.queue.put(conn)
def get_connection(self):
return self.queue.get()
def release_connection(self, conn):
self.queue.put(conn)
def close_all(self):
logger.info('执行结束关闭所有phoenix连接')
while not self.queue.empty():
conn = self.queue.get()
conn.close()
class phoenixServer:
def __init__(self):
# JDBC 驱动路径
# 注意驱动路径中不能包含中文
phoenix_jars = [
r'G:\big_data\software\phoenix\phoenix-client-embedded-hbase-2.5-5.2.0.jar',
r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-1.2-api-2.18.0.jar',
r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-api-2.18.0.jar',
r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-core-2.18.0.jar',
r'G:\big_data\software\phoenix\phoenix-hbase-2.5-5.2.0-bin\lib\log4j-slf4j-impl-2.18.0.jar',
]
# Phoenix JDBC URL
jdbc_url = 'jdbc:phoenix:hadoop01:2181:hbase-unsecure;characterEncoding=UTF-8'
driver_config = {'phoenix.schema.isNamespaceMappingEnabled': 'true'}
driver_driver = 'org.apache.phoenix.jdbc.PhoenixDriver'
# 建立连接
# 创建连接池
self.pool = PhoenixConnectionPool(size=2, driver_driver=driver_driver, jdbc_url=jdbc_url,
driver_jars=phoenix_jars, driver_config=driver_config)
# self.conn = jaydebeapi.connect(driver_class, jdbc_url, driver_config, phoenix_jar)
def find(self, sql):
# 创建游标
data_lists = []
conn = self.pool.get_connection()
cursor = conn.cursor()
try:
# 执行查询
cursor.execute(sql)
# 获取结果
results = cursor.fetchall()
for rep in results:
data_lists.append(rep)
finally:
cursor.close()
self.pool.release_connection(conn)
logger.info(data_lists)
return data_lists
def upsert(self, sql, data: list):
conn = self.pool.get_connection()
cursor = conn.cursor()
try:
cursor.executemany(sql, data)
conn.commit()
except:
conn.rollback()
logger.warn('数据插入错误')
finally:
self.pool.release_connection(conn)
cursor.close()
# if __name__ == '__main__':
# c = phoenixServer()
# sql = "select * from SCIENCE.SCIENCE_ARTICLE_METADATA"
# c.find(sql)
# c.pool.close_all()