初始化

master
workwindows 12 months ago
commit fff4cdc299

8
.idea/.gitignore vendored

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/../../../../:\mysubject\IpPool\.idea/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.8 (fast_api)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/SimplePool/log_file.log" charset="GBK" />
</component>
</project>

@ -0,0 +1,17 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="3">
<item index="0" class="java.lang.String" itemvalue="opencv-contrib-python" />
<item index="1" class="java.lang.String" itemvalue="astunparse" />
<item index="2" class="java.lang.String" itemvalue="rich" />
</list>
</value>
</option>
</inspection_tool>
</profile>
</component>

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (fast_api)" project-jdk-type="Python SDK" />
</project>

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/IpPool.iml" filepath="$PROJECT_DIR$/.idea/IpPool.iml" />
</modules>
</component>
</project>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# @date2023/12/19 13:55
# @AuthorLiuYiJie
# @file __init__.py

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
# @date2023/12/20 14:31
# @AuthorLiuYiJie
# @file client
from SimplePool.ipserver.scheduler import run_task
run_task()

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# @date2023/12/20 9:33
# @AuthorLiuYiJie
# @file __init__.py

@ -0,0 +1,211 @@
# -*- coding: utf-8 -*-
# @date2023/12/25 16:01
# @AuthorLiuYiJie
# @file ipdb
"""
对ip库里面的代理进行操作
"""
from SimplePool.db.redisdb import AsyncRedisDB
from SimplePool.db.mongodb import AsyncMongoDB
from SimplePool import setting
from typing import Tuple, Union
class ProxyOperate(AsyncRedisDB):
def __init__(self):
super(ProxyOperate, self).__init__()
self._init_score = setting.PROXY_DETECTION_INIT
self._max_score = setting.PROXY_DETECTION_MAX
self._min_score = setting.PROXY_DETECTION_MIN
async def add_proxy(self, table: str, values):
"""
判断库里是否有当前ip不增加
新增一个代理并赋给一个初始化分数防止获取代理后不能使用却还是获取了
:param table: 表名
:param values:代理ip
:return:
"""
return await self.zadd(table=table, values=values)
async def reduce_proxy_score(self, table, values):
"""
验证库里的ip是否可用可用加1
:param table:
:param values:
:return:
"""
verify_list = list()
# 判断分数是否操超过100超过100不增加分数
if isinstance(values, list):
for value in values:
score = await self.zscore(table, value)
# 当分数小于最大分数分数加1
if score < self._max_score:
verify_list.append(value)
return await self.zincrby(table=table, values=verify_list, scores=1)
else:
score = await self.zscore(table, values)
# 当分数小于最大分数分数加1
if score < self._max_score:
return await self.zincrby(table=table, values=values, scores=1)
async def increase_proxy_score(self, table, values):
"""
验证库里的ip是否可用不可用减一
:param table:
:param values:
:return:
"""
# 判断分数是否小于最小分数,小于最小分数不在减少
verify_list = list()
if isinstance(values, list):
for value in values:
score = await self.zscore(table, value)
# 当分数小于等于最大分数大于等于最小分数分数减1
if self._min_score <= score <= self._max_score:
verify_list.append(value)
return await self.zincrby(table=table, values=verify_list, scores=-1)
else:
score = await self.zscore(table, values)
# 当分数小于等于最大分数大于等于最小分数分数减1
if self._min_score <= score <= self._max_score:
return await self.zincrby(table=table, values=values, scores=-1)
async def get_proxies(self, table, ):
"""
返回指定数量代理分数由高到低排序
:param score_max: 最大分数
:param score_min: 最小分数
:param table: 表名
:param count: 获取代理数量
"""
# 首先返回分数为100的最新的代理
return await self.hgetall(table)
async def count_all_proxies(self, table):
"""
返回所有代理总数
:return 数量
"""
return await self.zcard(table)
async def clear_proxies(self, table):
"""
删除分数小于最低分数的代理
:return:
"""
# 查询所有小于最小分数的代理
reNum = await self.zremrangebyscore(table, score_min=80, score_max=self._min_score - 1)
return reNum
async def count_score_proxies(self, table, score_min=100, score_max=100) -> int:
"""
查询min和max分数之间的元素的数量
:param score_max:
:param score_min:
:param table:
:return: 数量
"""
return await self.zcount(table, score_min=score_min, score_max=score_max)
async def all_proxies(self, table, score_min=90, score_max=100):
"""
查询所有的代理
:param table:
:param score_min:
:param score_max:
:return: 所有代理
"""
return await self.zrevrangebyscore(table, score_min=score_min, score_max=score_max)
class ProxyMongo(AsyncMongoDB):
def __init__(self):
super(ProxyMongo, self).__init__()
async def add_proxy(self, table: str, values: Union[list, dict]):
"""
判断库里是否有当前ip不增加
:param table: 表名
:param values:代理ip
:return:返回成功插入的数量
"""
if isinstance(values, list):
affect_count, inserted_ids, up_count = await self.add_batch(coll_name=table, datas=values)
else:
affect_count, inserted_ids, up_count = await self.add_batch(coll_name=table, datas=[values])
return affect_count, inserted_ids, up_count
async def update_proxy(self, table: str, value: dict):
"""
验证完成后更新代理
:param table:
:param value:
:return:
"""
condition = {'_id': value['_id']}
return await self.update(coll_name=table, condition=condition, data=value)
async def get_all_proxies(self, table, condition: dict = None, limit: int = 0):
"""
查询所有代理用来验证
:param limit: 结果数量
:param table:
:param condition: 条件
:return:
"""
condition = {} if condition is None else condition
return await self.find(coll_name=table, condition=condition, limit=limit)
async def get_proxies(self, table, condition: Union[Tuple, dict] = None, display_name: dict = None, limit: int = 1):
"""
返回可用代理,
:param display_name: 指定返回的字段
:param table:
:param condition: 根据条件查询成功率地区时间
:param limit:条数
:return:
"""
display_name = {} if display_name is None else display_name
# 默认随机返回一条成功率90以上的
if not condition:
condition = {"verify_success_rate": {"$gte": 90}}
# 使用聚合管道来进行随机抽样
limit = limit or 1
results = await self.find_condition(coll_name=table, condition=condition, display_name=display_name,
limit=limit)
# 有条件,默认返回符合条件的
else:
condition = {"$and": [condition, {"verify_success_rate": {"$gte": 90}}]}
limit = limit or 1
results = await self.find_condition(coll_name=table, condition=condition, display_name=display_name,
limit=limit)
# 啥都没有,随便返回一条
if not results:
results = await self.find_condition(coll_name=table, display_name=display_name, limit=limit)
return results
async def count_proxies(self, table: str, condition: dict = None) -> int:
"""
查询当前库里有多少代理
:param table:
:param condition:
:return:
"""
condition = {} if condition is None else condition
collection = self.get_collection(table)
count = await collection.count_documents(condition)
return count
async def delete_proxies(self, table:str, condition:dict = None):
"""
删除库里超过过期时间的代理
:param table:
:param condition:
:return:
"""
if condition is None:
raise ValueError("当前删除条件为空error")
delete_count = await self.delete(coll_name=table, condition=condition)
return delete_count

@ -0,0 +1,268 @@
# -*- coding: utf-8 -*-
# @date2023/12/25 16:01
# @AuthorLiuYiJie
# @file mongodb
import asyncio
from urllib import parse
from typing import Union, Tuple
from pymongo.errors import (
DuplicateKeyError, BulkWriteError
)
from motor.motor_asyncio import (
AsyncIOMotorClient,
AsyncIOMotorDatabase,
AsyncIOMotorCollection
)
import SimplePool.setting as setting
from SimplePool.log_code.log import logger
class AsyncMongoDB:
def __init__(
self,
host=None,
port=None,
db=None,
username=None,
password=None,
url=None,
loop=None,
**kwargs
):
self._loop = loop
if url:
self._get_mongo(url)
else:
self._host = host or setting.MONGODB_HOST
self._port = port or setting.MONGODB_PORT
self._db = db or setting.MONGODB_DB
self._username = username or setting.MONGODB_USERNAME
self._password = password or setting.MONGODB_PASSWORD
self._get_mongo()
@classmethod
def from_url(cls, url, **kwargs):
"""
mongodb://username:password@host:port
解析还是直接传呢"""
if parse.urlparse(url).scheme != 'mongodb':
raise ValueError('url error, please use "mongodb://username:password@host:port"')
return cls(url=url, **kwargs)
def _get_mongo(self, url: str = None):
# 创建对数据库的引用不会执行I/O不需要 await 表达式
loop = self._loop or asyncio.get_event_loop()
if url:
self._clint: AsyncIOMotorClient = AsyncIOMotorClient(url, io_loop=loop)
else:
self._clint: AsyncIOMotorClient = AsyncIOMotorClient(self._host, self._port, io_loop=loop)
self._db: AsyncIOMotorDatabase = self._get_database(self._db)
def _get_database(self, database, **kwargs):
"""
根据数据库名获取数据库对象
:param database: 数据库名
:param kwargs:
:return:
"""
return self._clint.get_database(database, **kwargs)
def get_collection(self, coll_name) -> AsyncIOMotorCollection:
"""
根据集合名获取集合对象
:param coll_name: 集合名
:return:
"""
return self._db[coll_name]
async def add(self, coll_name: str, data: dict):
"""
:param coll_name: 集合名
:param data: 单条数据 {'_id': 'xx'}
:return: 插入影响的行数
"""
collection = self.get_collection(coll_name)
affect_count = None
try:
result = await collection.insert_one(data)
except DuplicateKeyError as dup:
logger.info(
"""
error: %s
""" % dup)
except Exception as e:
logger.warning('error: %s' % e)
else:
affect_count = 1
return affect_count
async def add_batch(self, coll_name: str, datas: list, replace: bool = False):
"""
:param coll_name: 集合名
:param datas: 多条数据 [{'_id': 'xx'}, ...]
:param replace:
:return: (插入影响的行数, 插入的id)
"""
collection = self.get_collection(coll_name)
inserted_ids = []
affect_count = 0
up_count = 0
try:
affect_count = len(datas)
result = await collection.insert_many(datas, ordered=False)
except DuplicateKeyError as dup:
logger.warning(
"""
error: %s
""" % dup)
except BulkWriteError as bulk_write_e:
# 获取插入失败的代理
write_errors = bulk_write_e.details.get('writeErrors')
for write_error in write_errors:
# 判断是否是因为唯一索引插入失败
if write_error.get('code') == 11000:
original_doc = write_error.get('op') # 插入的数据
ip_id = original_doc.get('third_id')
filter_query = {'ip_id': ip_id}
update_query = {'$set': original_doc}
up_result = await collection.update_one(filter=filter_query, update=update_query)
affect_count -= 1
up_count = up_result.modified_count
except Exception as e:
logger.error(
"""
error: %s
""" % e)
else:
inserted_ids = result.inserted_ids
affect_count = len(inserted_ids)
return affect_count, inserted_ids, up_count
async def delete(self, coll_name: str, condition: dict = None):
"""
:param coll_name: 集合名
:param condition: 删除条件 {'i': {'$gt': 1000}}
:return: 删除的条数
"""
if condition is None:
condition = {}
collection = self.get_collection(coll_name)
count = await collection.count_documents(condition)
try:
result = await collection.delete_many(condition)
except Exception as e:
logger.warning(
"""
error: %s
condition: %s
count: %s
""" % (e, condition, count))
return False
return result.deleted_count
async def update(self, coll_name: str, data: dict, condition: dict = None, upsert: bool = True):
"""
:param coll_name: 集合名
:param condition: 更新条件 {'i': {'$gt': 1000}}
:param data: 修改的值 {'$set': {'key': 'value'}}
:param upsert: 不存在则插入
:return: 满足条件的条数
"""
if condition is None:
condition = {} # 如果条件为空将会查找所有
collection = self.get_collection(coll_name)
try:
result = await collection.update_one(condition, {'$set': data}, upsert=upsert)
except Exception as e:
logger.warning(
"""
error: %s
condition: %s
""" % (e, condition))
return False
return result.modified_count
async def update_batch(self, coll_name: str, datas: dict, condition: dict = None, upsert: bool = True):
"""
更新多条数据如果不存在则插入
:param coll_name:
:param datas:
:param condition:
:param upsert:
:return:
"""
if condition is None:
condition = {} # 如果条件为空将会查找所有
collection = self.get_collection(coll_name)
try:
result = await collection.update_many(condition, {'$set': datas}, upsert=upsert)
except Exception as e:
logger.warning(
"""
error: %s
condition: %s
""" % (e, condition))
return False
return result.modified_count
async def find(self, coll_name: str, condition: Union[Tuple, dict] = None, display_name: dict = None,
limit: int = 0, **kwargs):
"""
:param display_name: 返回的字段
:param coll_name: 集合名
:param condition: 查询条件 {'i': {'$lt': 4}}
:param limit: 结果数量
:return: 插入影响的行数
"""
condition = {} if condition is None else condition
display_name = {} if display_name is None else display_name
collection = self.get_collection(coll_name)
results = []
if limit == 1:
results.append(await collection.find_one(condition, display_name))
return results
elif limit > 1:
cursor = collection.find(condition, display_name)
for document in await cursor.to_list(length=limit):
results.append(document)
else:
find_results = collection.find(condition, display_name)
async for document in find_results:
results.append(document)
return results
async def find_condition(self, coll_name: str, condition: Union[Tuple, dict] = None, display_name: dict = None,
limit: int = 0, **kwargs):
"""
:param display_name: 指定返回的字段
:param coll_name: 集合名
:param condition: 管道,根据条件查询随机返回指定数量代理
:param limit: 结果数量
:return: 插入影响的行数
"""
condition = {} if condition is None else condition
displayName = {} if display_name is None else display_name
pipeline = [
{'$match': condition},
{"$project": displayName},
{"$sample": {"size": limit}}
]
collection = self.get_collection(coll_name)
results = await collection.aggregate(pipeline).to_list(limit)
return results
async def count(self, coll_name: str, condition: dict):
"""
:param coll_name: 集合名
:param condition: 查询条件 {'i': {'$gt': 1000}}
:return: 满足条件的条数
"""
condition = {} if condition is None else condition
collection = self.get_collection(coll_name)
count = await collection.count_documents(condition)
return count
async def command(self, command):
return await self._db.command(command)

@ -0,0 +1,431 @@
# -*- coding: utf-8 -*-
# @date2023/12/20 9:33
# @AuthorLiuYiJie
# @file redis
import redis
import aioredis
import SimplePool.setting as setting
from typing import Union, List, Any, TypeVar
__all__ = (
"redisDB",
"AsyncRedisDB"
)
_RedisT = TypeVar('_RedisT', bound='RedisDB')
_AioRedisT = TypeVar('_AioRedisT', bound='AsyncRedisDB')
ZScoreT = Union[float, str]
class redisDB:
def __init__(self, host=None, port=None, db=None, password=None):
self.host = host or setting.REDIS_HOST
self.port = port or setting.REDIS_PORT
self.db = db or setting.REDIS_DB
self.password = password or setting.REDIS_PASSWORD
self._redis = None
self._pool = None
self.get_redis()
def get_redis(self):
self._pool = redis.ConnectionPool(
host=self.host,
port=self.port,
db=self.db,
password=self.password,
decode_responses=True
)
self._redis = redis.Redis(connection_pool=self._pool, decode_responses=True)
@property
def redis(self) -> redis.Redis:
return self._redis
def hset(self, name, datas):
"""
:param: 存储字典
:return:
"""
pipe = self.redis.pipeline()
pipe.multi()
for key, value in datas:
pipe.hset(name, key, value)
pipe.multi()
def keys(self, pattern: Union[str] = '*') -> list:
return self.redis.keys(pattern=pattern)
class AsyncRedisDB:
"""使用aioredis调用时惰性初始化"""
def __init__(
self,
host=None,
port: int = 6379,
db: int = 0,
password: str = None
):
self._host = host or setting.REDIS_HOST
self._port = port or setting.REDIS_PORT
self._db = db or setting.REDIS_DB
self._password = password or setting.REDIS_PASSWORD
self.__redis: Union[aioredis.Redis, None] = None
@property
def _redis(self):
if self.__redis is None:
self.get_connect()
return self.__redis
@_redis.setter
def _redis(self, val):
self.__redis = val
def get_connect(self):
self._redis = aioredis.Redis(
host=self._host,
port=self._port,
db=self._db,
password=self._password,
decode_responses=True
)
async def sadd(self, table, values) -> Union[int, None]:
"""
使用无序set存储数据用来去重
:param table:
:param values: 支持单个值或多个值
:return: 如返回0库中存在返回1入库批量添加无返回值
"""
if isinstance(values, list):
pipe = self._redis.pipeline()
pipe.multi()
for value in values:
await pipe.sadd(table, value)
await pipe.execute()
else:
return await self._redis.sadd(table, values)
async def hset(self, table, values):
"""
使用hash存储数据
:param table:
:param values:
:return:
"""
return await self._redis.hset(table, mapping=values)
async def hgetall(self, table):
return await self._redis.hgetall(table)
async def scard(self, table) -> int:
"""
获取set中元素个数类似于len
:rtype: object
:param table:
:return: 集合中元素个数
"""
return await self._redis.scard(table)
async def zcard(self, table) -> int:
"""
返回已排序的集合中元素数量
:rtype: object
:param table:
:return: 集合中元素个数
"""
return await self._redis.zcard(table)
async def zcount(self, table, score_min: ZScoreT, score_max: ZScoreT) -> int:
"""
返回有序集合中分数处于min和max直接的元素数量
:param score_max:
:param score_min:
:param table:
:return:
"""
return await self._redis.zcount(table, min=score_min, max=score_max)
async def sismember(self, table, value) -> bool:
"""
判断是否是集合的成员 类似in
:param table:
:param value:
:return:
"""
return await self._redis.sismember(table, value)
async def srem(self, table, values):
"""
从set中指定值删除
:param table:
:param values:
:return:
"""
if isinstance(values, list):
pipe = self._redis.pipeline()
pipe.multi()
for value in values:
await pipe.srem(table, value)
else:
return await self._redis.srem(table, values)
async def zadd(self, table, values, scores=0):
"""
在table对应的有序集合中添加元素
:param table:
:param values:
:param scores:
:return:
"""
if isinstance(values, list):
# scores数量需要与values相等
if not isinstance(scores, list):
scores = [scores] * len(values)
else:
assert len(values) == len(scores)
pipe = self._redis.pipeline()
pipe.multi()
for value, score in zip(values, scores):
await pipe.execute_command(
'ZADD', table, score, value
)
return await pipe.execute()
else:
return await self._redis.execute_command(
'ZADD', table, scores, values)
async def zincrby(self, table, values, scores=1):
"""
在table对应的有序集合中增加元素分数
:param table:
:param values:
:param scores:
:return:
"""
if isinstance(values, list):
# scores数量需要与values相等
if not isinstance(scores, list):
scores = [scores] * len(values)
else:
assert len(values) == len(scores)
pipe = self._redis.pipeline()
pipe.multi()
for value, score in zip(values, scores):
await pipe.execute_command("ZINCRBY", table, score, value)
return await pipe.execute()
else:
return await self._redis.execute_command(
'ZINCRBY', table, scores, values)
async def zscore(self, table, values):
if isinstance(values, list):
# scores数量需要与values相等
pipe = self._redis.pipeline()
pipe.multi()
for value in values:
await pipe.execute_command("ZSCORE", table, value)
return await pipe.execute()
else:
return await self._redis.execute_command("ZSCORE", table, values)
async def zexists(
self, name: str,
values: Union[List[Any], Any]
) -> Union[List[bool], bool]:
"""
判断元素是否在zset中存在通过分数判断如果分数存在则元素存在
:param name:
:param values:
:return:
"""
is_exists = []
if isinstance(values, list):
pipe = await self._redis.pipeline()
pipe.multi()
for value in values:
pipe.zscore(name, value)
score_results = await pipe.execute()
for is_exist in score_results:
if is_exist is not None:
is_exists.append(1)
else:
is_exists.append(0)
else:
score_results = await self._redis.zscore(name, values)
is_exists = 1 if score_results is not None else 0
return is_exists
async def zrange(self):
raise NotImplemented
async def zrem(self, name, values):
"""
移除集合内指定元素
:param name:
:param values:
:return:
"""
if isinstance(values, list):
await self._redis.zrem(name, *values)
else:
await self._redis.zrem(name, values)
async def zremrangebyscore(
self, name: str, score_min: ZScoreT, score_max: ZScoreT
) -> int:
"""
移除指定分数区间的成员
https://www.runoob.com/redis/sorted-sets-zremrangebyscore.html
:param name:
:param score_min:
:param score_max:
:return:
"""
removed = await self._redis.zremrangebyscore(
name=name, min=score_min, max=score_max
)
return removed
async def zrangebyscore(
self, name: str, score_min: ZScoreT, score_max: ZScoreT, count: int = None,
**kwargs
):
"""
返回score_min ~ score_max区间的的元素
:param name:
:param score_min:
:param score_max:
:param count: 整合start和num
:param kwargs:
:return:
"""
if count is None:
result = await self._redis.zrangebyscore(
name, min=score_min, max=score_max,
**kwargs
)
else:
result = await self._redis.zrangebyscore(
name, min=score_min, max=score_max, start=0, num=count,
**kwargs
)
return result
async def zrevrangebyscore(
self, name: str, score_min: ZScoreT, score_max: ZScoreT, count: int = None,
**kwargs
):
"""
返回score_min ~ score_max区间的的元素 和zrangebyscore相反
:param name:
:param score_min:最大分数
:param score_max:最小分数
:param count: 整合start和num
:param kwargs:
:return:
"""
if count is None:
result = await self._redis.zrevrangebyscore(
name, min=score_max, max=score_min,
**kwargs
)
else:
result = await self._redis.zrevrangebyscore(
name, min=score_max, max=score_min, start=0, num=count,
**kwargs
)
return result
async def zrangebyscore_set_score(
self, name, score_min: ZScoreT, score_max: ZScoreT, new_score, count: int = None
) -> list:
# 使用lua脚本 保证操作的原子性
lua = """
-- local key = KEYS[1]
local min_score = ARGV[1]
local max_score = ARGV[2]
local set_score = ARGV[3]
local count = ARGV[4]
-- 取值
local datas = nil
if count then
datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores','limit', 0, count)
else
datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores')
end
local real_datas = {} -- 数据
--修改优先级
for i=1, #datas, 2 do
local data = datas[i]
local score = datas[i+1]
table.insert(real_datas, data) -- 添加数据
redis.call('zincrby', KEYS[1], set_score - score, datas[i])
end
return real_datas
"""
cmd = self._redis.register_script(lua)
if count:
res = await cmd(keys=[name], args=[score_min, score_max, new_score, count])
else:
res = await cmd(keys=[name], args=[score_min, score_max, new_score])
return res
async def zpop(
self, name, start: int = None, end: int = None, count: int = 1,
desc: bool = False, remove: bool = True
) -> List[Any]:
"""
按照索引范围获取name对应的有序集合的元素
:param name: redis的key
:param start: 有序集合索引起始位置非分数
:param end: 有序集合索引结束位置非分数包含end闭区间
:param count: 获取的数量在没有start和end参数时由count来解构
:param desc: 排序规则默认按照分数从小到大排序
:param remove: 是否移除默认为True
:return:
"""
if start is None:
start = 0
if end is None:
end = count - 1 if count > 0 else count
pipe = self._redis.pipeline()
pipe.multi()
await pipe.zrange(name=name, start=start, end=end, desc=desc)
if remove:
await pipe.zremrangebyrank(name=name, min=start, max=end)
results, count = await pipe.execute()
return results
async def rpush(self, table, values):
"""
使用list存储数据
:param table:
:param values:
:return:
"""
if isinstance(values, list):
pipe = self._redis.pipeline()
pipe.multi()
for value in values:
await pipe.rpush(table, value)
await pipe.execute()
else:
return await self._redis.rpush(table, values)

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# @date2023/12/20 13:36
# @AuthorLiuYiJie
# @file __init__.py

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
# @date2023/12/28 17:41
# @AuthorLiuYiJie
# @file agent_store
"""
爬虫模块用力增加代理商获取ip
"""
import random
import json
from SimplePool.ipserver.crawler import downLoader
from SimplePool.log_code.log import logger
import datetime
class SpiderMeta(type):
"""
爬虫元类后续所有不同的代理都要继承此类
"""
spiders = list()
def __new__(mcs, *args, **kwargs):
"""
子类构造
:param args: args[0]=name. args[1]=bases, args[2]=attrs
:param kwargs:
"""
if not args[2].get('run', ''):
raise ValueError(f'当前{args[0]}类无run方法 请实现')
new_cls = super().__new__(mcs, *args, **kwargs)
SpiderMeta.spiders.append(new_cls)
return new_cls
class ZhiMaSpider(metaclass=SpiderMeta):
start_url = 'https://icanhazip.com/'
source_name = '芝麻代理'
codes = {
111: "提取链接请求太过频繁,超出限制,请在1秒后再次请求",
113: "白名单未添加/白名单掉了,请设置为白名单!",
114: "余额不足",
115: "没有资源或没有符合条件的数据,请更换地区等条件重新生成api链接地址",
116: "您的套餐今日已到达上限!",
117: "您的套餐pack传参有误!请检测您现在的ip是否在套餐所在账户",
118: "您的账户异常,请联系客服!账户处于被禁用状态",
121: "您的该套餐已经过期了!",
401: "白名单错误/使用的IP已经过期",
403: "客户目标网站异常,联系客服处理",
}
async def run(self):
content = await downLoader.download(self.start_url)
results_b = content.strip().decode('utf-8')
results = json.loads(results_b)
if results['code'] == 0:
proxies = list()
data = [
{
"ip": "49.68.68.197",
"port": random.randint(1111, 6000),
"expire_time": "2019-05-24 08:58:31",
"city": "徐州市",
"isp": "电信"
},
{
"ip": "58.218.201.108",
"port": random.randint(1111, 6000),
"expire_time": "2019-05-24 08:55:31",
"city": "苏州市",
"isp": "电信",
"outip": "219.136.47.161",
}
]
# data = content['data']
for oneIp in data:
proxy = f"{oneIp['ip']}:{oneIp['port']}"
detail = {'ip_id': proxy, 'city': oneIp.get('city', ''), 'isp': oneIp.get('isp', ''), 'verify_num': 0,
'verify_success_rate': 0, 'verify_success_num': 0, 'verify_error_num': 0,
'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'verify_time': '', "next_verify_time": "", 'expire_time': oneIp.get('expire_time', ''),
'proxy_source': self.source_name}
proxies.append(detail)
# 验证新获取的ip然后入库
return proxies
else:
logger.warning(f'提取{self.source_name}错误, 错误原因为{self.codes.get(results["code"])}')
class Daili666Spider(metaclass=SpiderMeta):
start_url = 'https://icanhazip.com/'
source_name = '66代理'
async def run(self):
content = await downLoader.download(self.start_url)
proxies = list()
data = [
{
"ip": "49.68.68.197",
"port": random.randint(1111, 6000),
"expire_time": "2019-05-24 08:58:31",
"city": "南京市",
"isp": "电信"
},
{
"ip": "58.218.201.108",
"port": random.randint(1111, 6000),
"expire_time": "2019-05-24 08:55:31",
"city": "上海市",
"isp": "移动",
"outip": "219.136.47.161",
}
]
# data = content['data']
for one in data:
proxy = f"{one['ip']}:{one['port']}"
detail = {'ip_id': proxy, 'city': one.get('city', ''), 'isp': one.get('isp', ''), 'verify_num': 0,
'verify_success_rate': 0, 'verify_success_num': 0, 'verify_error_num': 0,
'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'verify_time': '', "next_verify_time": "", 'expire_time': one.get('expire_time', ''),
'proxy_source': self.source_name}
proxies.append(detail)
# 验证新获取的ip然后入库
return proxies
#
#
# class KuaidailiSpider(metaclass=SpiderMeta):
# start_url = 'http://www.kuaidaili.com/free/inha/{}/'
#
# async def run(self, page_total=2):
# # urls = [self.start_url.format(i)
# # for i in range(self._counter, self._counter + page_total)]
# # self.increment(page_total)
# ans = []
# return self.start_url
#
#
# class XiciSpider(metaclass=SpiderMeta):
# start_url = 'http://www.xicidaili.com/nn/{}'
#
# async def run(self, page_total=2):
# # urls = [self.start_url.format(i)
# # for i in range(self._counter, self._counter + page_total)]
# # self.increment(page_total)
# ans = []
# return self.start_url
# spiders = [cls() for cls in SpiderMeta.spiders]
# for spider in spiders:
# print(spider.run())

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# @date2023/12/29 10:16
# @AuthorLiuYiJie
# @file crawler
"""
请求代理返回ip
"""
import aiohttp
import datetime
import asyncio
from SimplePool.log_code.log import logger
class Downloader:
@staticmethod
async def fetch(url):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False), trust_env=True) as session:
try:
async with session.request(method='get', url=url, timeout=5) as response:
content = await response.read()
# content = '123456'
print(content)
except Exception as e:
logger.error(f'{url}请求出错, {e}')
content = b"{'status': 1000, 'data': []}"
finally:
return content
async def download(self, url):
content = await self.fetch(url)
return content
downLoader = Downloader()

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
# @date2023/12/20 13:58
# @AuthorLiuYiJie
# @file getip
"""
请求ip,存入redis
"""
import aiohttp
import asyncio
import datetime
import time
import json
from SimplePool.log_code.log import logger
from SimplePool.db.ipdb import ProxyMongo
import SimplePool.setting as setting
from SimplePool.ipserver.agent_store import SpiderMeta
class GetIp:
def __init__(self):
self._db = ProxyMongo()
self.proxy_num = 0
self.proxy_num_min = setting.PROXY_NUMBER_MIN
async def ip_count(self):
"""
判断库里的代理数量是否达到最小值,90分数以上
:return:
"""
condition = {"verify_success_rate": {"$gte": 90}}
return True if await self._db.count_proxies(table=setting.MONGODB_COLL,
condition=condition) > self.proxy_num_min else False
async def run(self):
# 每次请求ip时,进行初始化验证
logger.info(f'开始请求获取ip.....')
# 获取当前所有代理商
spiders = [cls() for cls in SpiderMeta.spiders]
while True:
available_count = await self.ip_count()
if available_count:
logger.info(f'当前库里分数90以上大于40本次获取{self.proxy_num}条代理')
break
time.sleep(0.5)
tasks = []
for spider in spiders:
task = asyncio.create_task(spider.run())
tasks.append(task)
results = await asyncio.gather(*tasks)
for result in results:
affect_count, inserted_ids, up_count = await self._db.add_proxy(table=setting.MONGODB_COLL, values=result)
self.proxy_num += affect_count

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# @date2023/12/20 13:36
# @AuthorLiuYiJie
# @file ipapi
import uvicorn
import base64
from fastapi import FastAPI, Request, Query
from SimplePool.db.ipdb import ProxyMongo
import SimplePool.setting as setting
app = FastAPI()
app.debug = 'debug'
def get_conn():
_db = ProxyMongo()
return _db
@app.get('/')
async def index():
return "Welcome to SimplePool"
@app.get('/getIp/v1/')
async def get_ip(num: int = 0):
db = get_conn()
display_name = {'_id': 1}
res = await db.get_proxies(setting.MONGODB_COLL, display_name=display_name, limit=num)
res_ip = [ip['_id'] for ip in res]
return res_ip
@app.get('/getAreaIp/v1/')
async def get_area(place: str = Query(...), num: int = 0):
db = get_conn()
condition = {'city': {"$regex": place}}
display_name = {'_id': 1}
res = await db.get_proxies(setting.MONGODB_COLL, condition=condition, display_name=display_name, limit=num)
res_ip = [ip['_id'] for ip in res]
return res_ip
if __name__ == '__main__':
uvicorn.run('ipapi:app', host='0.0.0.0', port=8080)

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
# @date2023/12/27 16:53
# @AuthorLiuYiJie
# @file scheduler
"""
调度定时执行任务
"""
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from SimplePool.ipserver.getip import GetIp
from SimplePool.ipserver.verifyip import VerifyProxy
from SimplePool.log_code.log import logger
def configure_scheduler():
scheduler = AsyncIOScheduler()
scheduler.add_job(run_VerifyProxy, 'interval', seconds=22, misfire_grace_time=1)
scheduler.add_job(run_GetIp, 'interval', seconds=11, misfire_grace_time=2)
return scheduler
async def run_VerifyProxy():
await VerifyProxy().run()
async def run_GetIp():
await GetIp().run()
def start_scheduler(scheduler):
scheduler.start()
logger.info('Scheduler started')
async def run_forever():
scheduler = configure_scheduler()
start_scheduler(scheduler)
try:
await asyncio.Event().wait()
except (KeyboardInterrupt, SystemExit):
logger.info('Stopping Scheduler...')
scheduler.shutdown()
def run_task():
logger.info('Starting Scheduled Tasks')
asyncio.run(run_forever())

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# @date2023/12/20 16:12
# @AuthorLiuYiJie
# @file verifyip
import aiohttp
import asyncio
from datetime import datetime, timedelta
from SimplePool.db.ipdb import ProxyMongo
import SimplePool.setting as setting
from SimplePool.log_code.log import logger
from typing import Union, List
class VerifyProxy:
def __init__(self):
self._db = ProxyMongo()
self.concurrency = 10
self.verify_counts = 0
self.verify_success_counts = 0
self.semaphore = asyncio.Semaphore(value=self.concurrency)
async def fetch(self, proxies: dict):
async with self.semaphore:
proxy = proxies['ip_id']
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False), trust_env=True) as session:
try:
async with session.request('get', url=setting.VERIFY_BASE_URL, proxy='', timeout=3) as response:
res = await response.json()
ress = response.status
res = {'status': 200, 'host': '49.68.68.197', 'port': ''}
except Exception as e: