commit fff4cdc299083b5814510bcff34a030b2f33eae2 Author: workwindows Date: Mon Jan 22 09:48:26 2024 +0800 初始化 diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..d704819 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/../../../../:\mysubject\IpPool\.idea/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/IpPool.iml b/.idea/IpPool.iml new file mode 100644 index 0000000..0280c05 --- /dev/null +++ b/.idea/IpPool.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..28c4456 --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..71f4218 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,17 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..9b073fc --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..b3aa74e --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/SimplePool/__init__.py b/SimplePool/__init__.py new file mode 100644 index 0000000..38d0b52 --- /dev/null +++ b/SimplePool/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/19 13:55 +# @Author:LiuYiJie +# @file: __init__.py diff --git a/SimplePool/__pycache__/__init__.cpython-38.pyc b/SimplePool/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..6500983 Binary files /dev/null and b/SimplePool/__pycache__/__init__.cpython-38.pyc differ diff --git a/SimplePool/__pycache__/setting.cpython-38.pyc b/SimplePool/__pycache__/setting.cpython-38.pyc new file mode 100644 index 0000000..941c424 Binary files /dev/null and b/SimplePool/__pycache__/setting.cpython-38.pyc differ diff --git a/SimplePool/client.py b/SimplePool/client.py new file mode 100644 index 0000000..e44deda --- /dev/null +++ b/SimplePool/client.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/20 14:31 +# @Author:LiuYiJie +# @file: client +from SimplePool.ipserver.scheduler import run_task + +run_task() diff --git a/SimplePool/db/__init__.py b/SimplePool/db/__init__.py new file mode 100644 index 0000000..81a758f --- /dev/null +++ b/SimplePool/db/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/20 9:33 +# @Author:LiuYiJie +# @file: __init__.py diff --git a/SimplePool/db/__pycache__/__init__.cpython-38.pyc b/SimplePool/db/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..9f8dabf Binary files /dev/null and b/SimplePool/db/__pycache__/__init__.cpython-38.pyc differ diff --git a/SimplePool/db/__pycache__/ipdb.cpython-38.pyc b/SimplePool/db/__pycache__/ipdb.cpython-38.pyc new file mode 100644 index 0000000..a7b4f4c Binary files /dev/null and b/SimplePool/db/__pycache__/ipdb.cpython-38.pyc differ diff --git a/SimplePool/db/__pycache__/mongodb.cpython-38.pyc b/SimplePool/db/__pycache__/mongodb.cpython-38.pyc new file mode 100644 index 0000000..a426da9 Binary files /dev/null and b/SimplePool/db/__pycache__/mongodb.cpython-38.pyc differ diff --git a/SimplePool/db/__pycache__/redisdb.cpython-38.pyc b/SimplePool/db/__pycache__/redisdb.cpython-38.pyc new file mode 100644 index 0000000..a0db918 Binary files /dev/null and b/SimplePool/db/__pycache__/redisdb.cpython-38.pyc differ diff --git a/SimplePool/db/ipdb.py b/SimplePool/db/ipdb.py new file mode 100644 index 0000000..25e0b6d --- /dev/null +++ b/SimplePool/db/ipdb.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/25 16:01 +# @Author:LiuYiJie +# @file: ipdb +""" +对ip库里面的代理进行操作 +""" +from SimplePool.db.redisdb import AsyncRedisDB +from SimplePool.db.mongodb import AsyncMongoDB +from SimplePool import setting +from typing import Tuple, Union + + +class ProxyOperate(AsyncRedisDB): + def __init__(self): + super(ProxyOperate, self).__init__() + self._init_score = setting.PROXY_DETECTION_INIT + self._max_score = setting.PROXY_DETECTION_MAX + self._min_score = setting.PROXY_DETECTION_MIN + + async def add_proxy(self, table: str, values): + """ + 判断库里是否有当前ip,有,不增加, + 新增一个代理,并赋给一个初始化分数,防止获取代理后,不能使用,却还是获取了 + :param table: 表名 + :param values:代理ip + :return: + """ + return await self.zadd(table=table, values=values) + + async def reduce_proxy_score(self, table, values): + """ + 验证库里的ip是否可用,可用加1 + :param table: + :param values: + :return: + """ + verify_list = list() + # 判断分数是否操超过100,超过100不增加分数 + if isinstance(values, list): + for value in values: + score = await self.zscore(table, value) + # 当分数小于最大分数,分数加1 + if score < self._max_score: + verify_list.append(value) + return await self.zincrby(table=table, values=verify_list, scores=1) + else: + score = await self.zscore(table, values) + # 当分数小于最大分数,分数加1 + if score < self._max_score: + return await self.zincrby(table=table, values=values, scores=1) + + async def increase_proxy_score(self, table, values): + """ + 验证库里的ip是否可用,不可用减一 + :param table: + :param values: + :return: + """ + # 判断分数是否小于最小分数,小于最小分数不在减少 + verify_list = list() + if isinstance(values, list): + for value in values: + score = await self.zscore(table, value) + # 当分数小于等于最大分数,大于等于最小分数,分数减1 + if self._min_score <= score <= self._max_score: + verify_list.append(value) + return await self.zincrby(table=table, values=verify_list, scores=-1) + else: + score = await self.zscore(table, values) + # 当分数小于等于最大分数,大于等于最小分数,分数减1 + if self._min_score <= score <= self._max_score: + return await self.zincrby(table=table, values=values, scores=-1) + + async def get_proxies(self, table, ): + """ + 返回指定数量代理,分数由高到低排序 + :param score_max: 最大分数 + :param score_min: 最小分数 + :param table: 表名 + :param count: 获取代理数量 + """ + # 首先返回分数为100的最新的代理 + return await self.hgetall(table) + + async def count_all_proxies(self, table): + """ + 返回所有代理总数 + :return 数量 + """ + return await self.zcard(table) + + async def clear_proxies(self, table): + """ + 删除分数小于最低分数的代理 + :return: + """ + # 查询所有小于最小分数的代理 + reNum = await self.zremrangebyscore(table, score_min=80, score_max=self._min_score - 1) + return reNum + + async def count_score_proxies(self, table, score_min=100, score_max=100) -> int: + """ + 查询min和max分数之间的元素的数量 + :param score_max: + :param score_min: + :param table: + :return: 数量 + """ + return await self.zcount(table, score_min=score_min, score_max=score_max) + + async def all_proxies(self, table, score_min=90, score_max=100): + """ + 查询所有的代理 + :param table: + :param score_min: + :param score_max: + :return: 所有代理 + """ + return await self.zrevrangebyscore(table, score_min=score_min, score_max=score_max) + + +class ProxyMongo(AsyncMongoDB): + def __init__(self): + super(ProxyMongo, self).__init__() + + async def add_proxy(self, table: str, values: Union[list, dict]): + """ + 判断库里是否有当前ip,有,不增加, + :param table: 表名 + :param values:代理ip + :return:返回成功插入的数量 + """ + if isinstance(values, list): + affect_count, inserted_ids, up_count = await self.add_batch(coll_name=table, datas=values) + else: + affect_count, inserted_ids, up_count = await self.add_batch(coll_name=table, datas=[values]) + return affect_count, inserted_ids, up_count + + async def update_proxy(self, table: str, value: dict): + """ + 验证完成后更新代理 + :param table: + :param value: + :return: + """ + condition = {'_id': value['_id']} + return await self.update(coll_name=table, condition=condition, data=value) + + async def get_all_proxies(self, table, condition: dict = None, limit: int = 0): + """ + 查询所有代理用来验证 + :param limit: 结果数量 + :param table: + :param condition: 条件 + :return: + """ + condition = {} if condition is None else condition + return await self.find(coll_name=table, condition=condition, limit=limit) + + async def get_proxies(self, table, condition: Union[Tuple, dict] = None, display_name: dict = None, limit: int = 1): + """ + 返回可用代理, + :param display_name: 指定返回的字段 + :param table: + :param condition: 根据条件查询,成功率,地区,时间 + :param limit:条数 + :return: + """ + display_name = {} if display_name is None else display_name + # 默认,随机返回一条成功率90以上的 + if not condition: + condition = {"verify_success_rate": {"$gte": 90}} + # 使用聚合管道来进行随机抽样 + limit = limit or 1 + results = await self.find_condition(coll_name=table, condition=condition, display_name=display_name, + limit=limit) + # 有条件,默认返回符合条件的 + else: + condition = {"$and": [condition, {"verify_success_rate": {"$gte": 90}}]} + limit = limit or 1 + results = await self.find_condition(coll_name=table, condition=condition, display_name=display_name, + limit=limit) + # 啥都没有,随便返回一条 + if not results: + results = await self.find_condition(coll_name=table, display_name=display_name, limit=limit) + return results + + async def count_proxies(self, table: str, condition: dict = None) -> int: + """ + 查询当前库里有多少代理 + :param table: + :param condition: + :return: + """ + condition = {} if condition is None else condition + collection = self.get_collection(table) + count = await collection.count_documents(condition) + return count + + async def delete_proxies(self, table:str, condition:dict = None): + """ + 删除库里超过过期时间的代理 + :param table: + :param condition: + :return: + """ + if condition is None: + raise ValueError("当前删除条件为空,error!!!!!") + delete_count = await self.delete(coll_name=table, condition=condition) + return delete_count diff --git a/SimplePool/db/mongodb.py b/SimplePool/db/mongodb.py new file mode 100644 index 0000000..3ca85d9 --- /dev/null +++ b/SimplePool/db/mongodb.py @@ -0,0 +1,268 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/25 16:01 +# @Author:LiuYiJie +# @file: mongodb + +import asyncio +from urllib import parse +from typing import Union, Tuple + +from pymongo.errors import ( + DuplicateKeyError, BulkWriteError +) +from motor.motor_asyncio import ( + AsyncIOMotorClient, + AsyncIOMotorDatabase, + AsyncIOMotorCollection +) +import SimplePool.setting as setting +from SimplePool.log_code.log import logger + + +class AsyncMongoDB: + def __init__( + self, + host=None, + port=None, + db=None, + username=None, + password=None, + url=None, + loop=None, + **kwargs + ): + self._loop = loop + if url: + self._get_mongo(url) + else: + self._host = host or setting.MONGODB_HOST + self._port = port or setting.MONGODB_PORT + self._db = db or setting.MONGODB_DB + self._username = username or setting.MONGODB_USERNAME + self._password = password or setting.MONGODB_PASSWORD + self._get_mongo() + + @classmethod + def from_url(cls, url, **kwargs): + """ + mongodb://username:password@host:port + 解析还是直接传呢""" + if parse.urlparse(url).scheme != 'mongodb': + raise ValueError('url error, please use "mongodb://username:password@host:port"') + return cls(url=url, **kwargs) + + def _get_mongo(self, url: str = None): + # 创建对数据库的引用不会执行I/O,不需要 await 表达式 + loop = self._loop or asyncio.get_event_loop() + if url: + self._clint: AsyncIOMotorClient = AsyncIOMotorClient(url, io_loop=loop) + else: + self._clint: AsyncIOMotorClient = AsyncIOMotorClient(self._host, self._port, io_loop=loop) + self._db: AsyncIOMotorDatabase = self._get_database(self._db) + + def _get_database(self, database, **kwargs): + """ + 根据数据库名获取数据库对象 + :param database: 数据库名 + :param kwargs: + :return: + """ + return self._clint.get_database(database, **kwargs) + + def get_collection(self, coll_name) -> AsyncIOMotorCollection: + """ + 根据集合名获取集合对象 + :param coll_name: 集合名 + :return: + """ + return self._db[coll_name] + + async def add(self, coll_name: str, data: dict): + """ + :param coll_name: 集合名 + :param data: 单条数据 {'_id': 'xx'} + :return: 插入影响的行数 + """ + collection = self.get_collection(coll_name) + affect_count = None + try: + result = await collection.insert_one(data) + except DuplicateKeyError as dup: + logger.info( + """ + error: %s + """ % dup) + except Exception as e: + logger.warning('error: %s' % e) + else: + affect_count = 1 + return affect_count + + async def add_batch(self, coll_name: str, datas: list, replace: bool = False): + """ + :param coll_name: 集合名 + :param datas: 多条数据 [{'_id': 'xx'}, ...] + :param replace: + :return: (插入影响的行数, 插入的id) + """ + collection = self.get_collection(coll_name) + inserted_ids = [] + affect_count = 0 + up_count = 0 + try: + affect_count = len(datas) + result = await collection.insert_many(datas, ordered=False) + except DuplicateKeyError as dup: + logger.warning( + """ + error: %s + """ % dup) + except BulkWriteError as bulk_write_e: + # 获取插入失败的代理 + write_errors = bulk_write_e.details.get('writeErrors') + for write_error in write_errors: + # 判断是否是因为唯一索引插入失败 + if write_error.get('code') == 11000: + original_doc = write_error.get('op') # 插入的数据 + ip_id = original_doc.get('third_id') + filter_query = {'ip_id': ip_id} + update_query = {'$set': original_doc} + up_result = await collection.update_one(filter=filter_query, update=update_query) + affect_count -= 1 + up_count = up_result.modified_count + except Exception as e: + logger.error( + """ + error: %s + """ % e) + else: + inserted_ids = result.inserted_ids + affect_count = len(inserted_ids) + return affect_count, inserted_ids, up_count + + async def delete(self, coll_name: str, condition: dict = None): + """ + :param coll_name: 集合名 + :param condition: 删除条件 {'i': {'$gt': 1000}} + :return: 删除的条数 + """ + if condition is None: + condition = {} + collection = self.get_collection(coll_name) + count = await collection.count_documents(condition) + try: + result = await collection.delete_many(condition) + except Exception as e: + logger.warning( + """ + error: %s + condition: %s + count: %s + """ % (e, condition, count)) + return False + return result.deleted_count + + async def update(self, coll_name: str, data: dict, condition: dict = None, upsert: bool = True): + """ + :param coll_name: 集合名 + :param condition: 更新条件 {'i': {'$gt': 1000}} + :param data: 修改的值 {'$set': {'key': 'value'}} + :param upsert: 不存在则插入 + :return: 满足条件的条数 + """ + if condition is None: + condition = {} # 如果条件为空将会查找所有 + collection = self.get_collection(coll_name) + try: + result = await collection.update_one(condition, {'$set': data}, upsert=upsert) + except Exception as e: + logger.warning( + """ + error: %s + condition: %s + """ % (e, condition)) + return False + return result.modified_count + + async def update_batch(self, coll_name: str, datas: dict, condition: dict = None, upsert: bool = True): + """ + 更新多条数据,如果不存在则插入 + :param coll_name: + :param datas: + :param condition: + :param upsert: + :return: + """ + if condition is None: + condition = {} # 如果条件为空将会查找所有 + collection = self.get_collection(coll_name) + try: + result = await collection.update_many(condition, {'$set': datas}, upsert=upsert) + except Exception as e: + logger.warning( + """ + error: %s + condition: %s + """ % (e, condition)) + return False + return result.modified_count + + async def find(self, coll_name: str, condition: Union[Tuple, dict] = None, display_name: dict = None, + limit: int = 0, **kwargs): + """ + :param display_name: 返回的字段 + :param coll_name: 集合名 + :param condition: 查询条件 {'i': {'$lt': 4}} + :param limit: 结果数量 + :return: 插入影响的行数 + """ + condition = {} if condition is None else condition + display_name = {} if display_name is None else display_name + collection = self.get_collection(coll_name) + results = [] + if limit == 1: + results.append(await collection.find_one(condition, display_name)) + return results + elif limit > 1: + cursor = collection.find(condition, display_name) + for document in await cursor.to_list(length=limit): + results.append(document) + else: + find_results = collection.find(condition, display_name) + async for document in find_results: + results.append(document) + return results + + async def find_condition(self, coll_name: str, condition: Union[Tuple, dict] = None, display_name: dict = None, + limit: int = 0, **kwargs): + """ + :param display_name: 指定返回的字段 + :param coll_name: 集合名 + :param condition: 管道,根据条件查询,随机返回指定数量代理 + :param limit: 结果数量 + :return: 插入影响的行数 + """ + condition = {} if condition is None else condition + displayName = {} if display_name is None else display_name + pipeline = [ + {'$match': condition}, + {"$project": displayName}, + {"$sample": {"size": limit}} + ] + collection = self.get_collection(coll_name) + results = await collection.aggregate(pipeline).to_list(limit) + return results + + async def count(self, coll_name: str, condition: dict): + """ + :param coll_name: 集合名 + :param condition: 查询条件 {'i': {'$gt': 1000}} + :return: 满足条件的条数 + """ + condition = {} if condition is None else condition + collection = self.get_collection(coll_name) + count = await collection.count_documents(condition) + return count + + async def command(self, command): + return await self._db.command(command) diff --git a/SimplePool/db/redisdb.py b/SimplePool/db/redisdb.py new file mode 100644 index 0000000..bc8b2d5 --- /dev/null +++ b/SimplePool/db/redisdb.py @@ -0,0 +1,431 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/20 9:33 +# @Author:LiuYiJie +# @file: redis +import redis +import aioredis +import SimplePool.setting as setting +from typing import Union, List, Any, TypeVar + +__all__ = ( + "redisDB", + "AsyncRedisDB" +) + +_RedisT = TypeVar('_RedisT', bound='RedisDB') +_AioRedisT = TypeVar('_AioRedisT', bound='AsyncRedisDB') +ZScoreT = Union[float, str] + + +class redisDB: + def __init__(self, host=None, port=None, db=None, password=None): + self.host = host or setting.REDIS_HOST + self.port = port or setting.REDIS_PORT + self.db = db or setting.REDIS_DB + self.password = password or setting.REDIS_PASSWORD + self._redis = None + self._pool = None + self.get_redis() + + def get_redis(self): + self._pool = redis.ConnectionPool( + host=self.host, + port=self.port, + db=self.db, + password=self.password, + decode_responses=True + ) + self._redis = redis.Redis(connection_pool=self._pool, decode_responses=True) + + @property + def redis(self) -> redis.Redis: + return self._redis + + def hset(self, name, datas): + """ + :param: 存储字典 + :return: + """ + pipe = self.redis.pipeline() + pipe.multi() + for key, value in datas: + pipe.hset(name, key, value) + pipe.multi() + + def keys(self, pattern: Union[str] = '*') -> list: + return self.redis.keys(pattern=pattern) + + +class AsyncRedisDB: + """使用aioredis调用时惰性初始化""" + + def __init__( + self, + host=None, + port: int = 6379, + db: int = 0, + password: str = None + ): + self._host = host or setting.REDIS_HOST + self._port = port or setting.REDIS_PORT + self._db = db or setting.REDIS_DB + self._password = password or setting.REDIS_PASSWORD + + self.__redis: Union[aioredis.Redis, None] = None + + @property + def _redis(self): + if self.__redis is None: + self.get_connect() + return self.__redis + + @_redis.setter + def _redis(self, val): + self.__redis = val + + def get_connect(self): + self._redis = aioredis.Redis( + host=self._host, + port=self._port, + db=self._db, + password=self._password, + decode_responses=True + ) + + async def sadd(self, table, values) -> Union[int, None]: + """ + 使用无序set存储数据,用来去重 + :param table: + :param values: 支持单个值或多个值 + :return: 如返回0:库中存在,返回1:入库,批量添加无返回值 + """ + if isinstance(values, list): + pipe = self._redis.pipeline() + pipe.multi() + for value in values: + await pipe.sadd(table, value) + await pipe.execute() + + else: + return await self._redis.sadd(table, values) + + async def hset(self, table, values): + """ + 使用hash存储数据 + :param table: + :param values: + :return: + """ + return await self._redis.hset(table, mapping=values) + + async def hgetall(self, table): + return await self._redis.hgetall(table) + + async def scard(self, table) -> int: + """ + 获取set中元素个数,类似于len + :rtype: object + :param table: + :return: 集合中元素个数 + """ + return await self._redis.scard(table) + + async def zcard(self, table) -> int: + """ + 返回已排序的集合中元素数量 + :rtype: object + :param table: + :return: 集合中元素个数 + """ + return await self._redis.zcard(table) + + async def zcount(self, table, score_min: ZScoreT, score_max: ZScoreT) -> int: + """ + 返回有序集合中分数处于min和max直接的元素数量 + :param score_max: + :param score_min: + :param table: + :return: + """ + return await self._redis.zcount(table, min=score_min, max=score_max) + + async def sismember(self, table, value) -> bool: + """ + 判断是否是集合的成员 类似in + :param table: + :param value: + :return: + """ + return await self._redis.sismember(table, value) + + async def srem(self, table, values): + """ + 从set中指定值删除 + :param table: + :param values: + :return: + """ + if isinstance(values, list): + pipe = self._redis.pipeline() + pipe.multi() + for value in values: + await pipe.srem(table, value) + else: + return await self._redis.srem(table, values) + + async def zadd(self, table, values, scores=0): + """ + 在table对应的有序集合中添加元素 + :param table: + :param values: + :param scores: + :return: + """ + if isinstance(values, list): + # scores数量需要与values相等 + if not isinstance(scores, list): + scores = [scores] * len(values) + else: + assert len(values) == len(scores) + pipe = self._redis.pipeline() + pipe.multi() + for value, score in zip(values, scores): + await pipe.execute_command( + 'ZADD', table, score, value + ) + return await pipe.execute() + else: + return await self._redis.execute_command( + 'ZADD', table, scores, values) + + async def zincrby(self, table, values, scores=1): + """ + 在table对应的有序集合中增加元素分数 + :param table: + :param values: + :param scores: + :return: + """ + if isinstance(values, list): + # scores数量需要与values相等 + if not isinstance(scores, list): + scores = [scores] * len(values) + else: + assert len(values) == len(scores) + pipe = self._redis.pipeline() + pipe.multi() + for value, score in zip(values, scores): + await pipe.execute_command("ZINCRBY", table, score, value) + return await pipe.execute() + else: + return await self._redis.execute_command( + 'ZINCRBY', table, scores, values) + + async def zscore(self, table, values): + if isinstance(values, list): + # scores数量需要与values相等 + pipe = self._redis.pipeline() + pipe.multi() + for value in values: + await pipe.execute_command("ZSCORE", table, value) + return await pipe.execute() + else: + return await self._redis.execute_command("ZSCORE", table, values) + + async def zexists( + self, name: str, + values: Union[List[Any], Any] + ) -> Union[List[bool], bool]: + """ + 判断元素是否在zset中存在,通过分数判断,如果分数存在则元素存在 + :param name: + :param values: + :return: + """ + + is_exists = [] + + if isinstance(values, list): + pipe = await self._redis.pipeline() + pipe.multi() + for value in values: + pipe.zscore(name, value) + score_results = await pipe.execute() + + for is_exist in score_results: + if is_exist is not None: + is_exists.append(1) + else: + is_exists.append(0) + else: + score_results = await self._redis.zscore(name, values) + is_exists = 1 if score_results is not None else 0 + + return is_exists + + async def zrange(self): + raise NotImplemented + + async def zrem(self, name, values): + """ + 移除集合内指定元素 + :param name: + :param values: + :return: + """ + if isinstance(values, list): + await self._redis.zrem(name, *values) + else: + await self._redis.zrem(name, values) + + async def zremrangebyscore( + self, name: str, score_min: ZScoreT, score_max: ZScoreT + ) -> int: + """ + 移除指定分数区间的成员 + https://www.runoob.com/redis/sorted-sets-zremrangebyscore.html + :param name: + :param score_min: + :param score_max: + :return: + """ + removed = await self._redis.zremrangebyscore( + name=name, min=score_min, max=score_max + ) + return removed + + async def zrangebyscore( + self, name: str, score_min: ZScoreT, score_max: ZScoreT, count: int = None, + **kwargs + ): + """ + 返回score_min ~ score_max区间的的元素 + :param name: + :param score_min: + :param score_max: + :param count: 整合start和num + :param kwargs: + :return: + """ + if count is None: + result = await self._redis.zrangebyscore( + name, min=score_min, max=score_max, + **kwargs + ) + else: + result = await self._redis.zrangebyscore( + name, min=score_min, max=score_max, start=0, num=count, + **kwargs + ) + return result + + async def zrevrangebyscore( + self, name: str, score_min: ZScoreT, score_max: ZScoreT, count: int = None, + **kwargs + ): + """ + 返回score_min ~ score_max区间的的元素, 和zrangebyscore相反 + :param name: + :param score_min:最大分数 + :param score_max:最小分数 + :param count: 整合start和num + :param kwargs: + :return: + """ + if count is None: + result = await self._redis.zrevrangebyscore( + name, min=score_max, max=score_min, + **kwargs + ) + else: + result = await self._redis.zrevrangebyscore( + name, min=score_max, max=score_min, start=0, num=count, + **kwargs + ) + return result + + async def zrangebyscore_set_score( + self, name, score_min: ZScoreT, score_max: ZScoreT, new_score, count: int = None + ) -> list: + # 使用lua脚本, 保证操作的原子性 + lua = """ + -- local key = KEYS[1] + local min_score = ARGV[1] + local max_score = ARGV[2] + local set_score = ARGV[3] + local count = ARGV[4] + + -- 取值 + local datas = nil + if count then + datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores','limit', 0, count) + else + datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores') + end + + local real_datas = {} -- 数据 + --修改优先级 + for i=1, #datas, 2 do + local data = datas[i] + local score = datas[i+1] + + table.insert(real_datas, data) -- 添加数据 + + redis.call('zincrby', KEYS[1], set_score - score, datas[i]) + end + + return real_datas + + """ + cmd = self._redis.register_script(lua) + if count: + res = await cmd(keys=[name], args=[score_min, score_max, new_score, count]) + else: + res = await cmd(keys=[name], args=[score_min, score_max, new_score]) + + return res + + async def zpop( + self, name, start: int = None, end: int = None, count: int = 1, + desc: bool = False, remove: bool = True + ) -> List[Any]: + """ + 按照索引范围获取name对应的有序集合的元素 + :param name: redis的key + :param start: 有序集合索引起始位置(非分数) + :param end: 有序集合索引结束位置(非分数),包含end,闭区间 + :param count: 获取的数量,在没有start和end参数时,由count来解构 + :param desc: 排序规则,默认按照分数从小到大排序 + :param remove: 是否移除,默认为True + :return: + """ + if start is None: + start = 0 + if end is None: + end = count - 1 if count > 0 else count + pipe = self._redis.pipeline() + pipe.multi() + await pipe.zrange(name=name, start=start, end=end, desc=desc) + if remove: + await pipe.zremrangebyrank(name=name, min=start, max=end) + results, count = await pipe.execute() + return results + + async def rpush(self, table, values): + """ + 使用list存储数据 + :param table: + :param values: + :return: + """ + if isinstance(values, list): + pipe = self._redis.pipeline() + pipe.multi() + for value in values: + await pipe.rpush(table, value) + await pipe.execute() + + else: + return await self._redis.rpush(table, values) + + diff --git a/SimplePool/ipserver/__init__.py b/SimplePool/ipserver/__init__.py new file mode 100644 index 0000000..6fe4e29 --- /dev/null +++ b/SimplePool/ipserver/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/20 13:36 +# @Author:LiuYiJie +# @file: __init__.py diff --git a/SimplePool/ipserver/__pycache__/__init__.cpython-38.pyc b/SimplePool/ipserver/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..2c6aba8 Binary files /dev/null and b/SimplePool/ipserver/__pycache__/__init__.cpython-38.pyc differ diff --git a/SimplePool/ipserver/__pycache__/agent_store.cpython-38.pyc b/SimplePool/ipserver/__pycache__/agent_store.cpython-38.pyc new file mode 100644 index 0000000..4cadb94 Binary files /dev/null and b/SimplePool/ipserver/__pycache__/agent_store.cpython-38.pyc differ diff --git a/SimplePool/ipserver/__pycache__/crawler.cpython-38.pyc b/SimplePool/ipserver/__pycache__/crawler.cpython-38.pyc new file mode 100644 index 0000000..67f874e Binary files /dev/null and b/SimplePool/ipserver/__pycache__/crawler.cpython-38.pyc differ diff --git a/SimplePool/ipserver/__pycache__/getip.cpython-38.pyc b/SimplePool/ipserver/__pycache__/getip.cpython-38.pyc new file mode 100644 index 0000000..596ccf4 Binary files /dev/null and b/SimplePool/ipserver/__pycache__/getip.cpython-38.pyc differ diff --git a/SimplePool/ipserver/__pycache__/ipapi.cpython-38.pyc b/SimplePool/ipserver/__pycache__/ipapi.cpython-38.pyc new file mode 100644 index 0000000..7e3eb84 Binary files /dev/null and b/SimplePool/ipserver/__pycache__/ipapi.cpython-38.pyc differ diff --git a/SimplePool/ipserver/__pycache__/scheduler.cpython-38.pyc b/SimplePool/ipserver/__pycache__/scheduler.cpython-38.pyc new file mode 100644 index 0000000..c1bc258 Binary files /dev/null and b/SimplePool/ipserver/__pycache__/scheduler.cpython-38.pyc differ diff --git a/SimplePool/ipserver/__pycache__/verifyip.cpython-38.pyc b/SimplePool/ipserver/__pycache__/verifyip.cpython-38.pyc new file mode 100644 index 0000000..c9efe85 Binary files /dev/null and b/SimplePool/ipserver/__pycache__/verifyip.cpython-38.pyc differ diff --git a/SimplePool/ipserver/agent_store.py b/SimplePool/ipserver/agent_store.py new file mode 100644 index 0000000..2125840 --- /dev/null +++ b/SimplePool/ipserver/agent_store.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/28 17:41 +# @Author:LiuYiJie +# @file: agent_store +""" +爬虫模块,用力增加代理商获取ip +""" +import random +import json +from SimplePool.ipserver.crawler import downLoader +from SimplePool.log_code.log import logger +import datetime + + +class SpiderMeta(type): + """ + 爬虫元类,后续所有不同的代理都要继承此类 + """ + spiders = list() + + def __new__(mcs, *args, **kwargs): + """ + 子类构造 + :param args: args[0]=name. args[1]=bases, args[2]=attrs + :param kwargs: + """ + if not args[2].get('run', ''): + raise ValueError(f'当前{args[0]}类无run方法, 请实现') + new_cls = super().__new__(mcs, *args, **kwargs) + SpiderMeta.spiders.append(new_cls) + return new_cls + + +class ZhiMaSpider(metaclass=SpiderMeta): + start_url = 'https://icanhazip.com/' + source_name = '芝麻代理' + codes = { + 111: "提取链接请求太过频繁,超出限制,请在1秒后再次请求", + 113: "白名单未添加/白名单掉了,请设置为白名单!", + 114: "余额不足", + 115: "没有资源或没有符合条件的数据,请更换地区等条件重新生成api链接地址", + 116: "您的套餐今日已到达上限!", + 117: "您的套餐pack传参有误!请检测您现在的ip是否在套餐所在账户!", + 118: "您的账户异常,请联系客服!账户处于被禁用状态", + 121: "您的该套餐已经过期了!", + 401: "白名单错误/使用的IP已经过期", + 403: "客户目标网站异常,联系客服处理", + } + + async def run(self): + content = await downLoader.download(self.start_url) + results_b = content.strip().decode('utf-8') + results = json.loads(results_b) + if results['code'] == 0: + proxies = list() + data = [ + { + "ip": "49.68.68.197", + "port": random.randint(1111, 6000), + "expire_time": "2019-05-24 08:58:31", + "city": "徐州市", + "isp": "电信" + }, + { + "ip": "58.218.201.108", + "port": random.randint(1111, 6000), + "expire_time": "2019-05-24 08:55:31", + "city": "苏州市", + "isp": "电信", + "outip": "219.136.47.161", + } + ] + # data = content['data'] + for oneIp in data: + proxy = f"{oneIp['ip']}:{oneIp['port']}" + detail = {'ip_id': proxy, 'city': oneIp.get('city', ''), 'isp': oneIp.get('isp', ''), 'verify_num': 0, + 'verify_success_rate': 0, 'verify_success_num': 0, 'verify_error_num': 0, + 'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'verify_time': '', "next_verify_time": "", 'expire_time': oneIp.get('expire_time', ''), + 'proxy_source': self.source_name} + proxies.append(detail) + # 验证新获取的ip,然后入库 + return proxies + else: + logger.warning(f'提取{self.source_name}错误, 错误原因为{self.codes.get(results["code"])}') + + +class Daili666Spider(metaclass=SpiderMeta): + start_url = 'https://icanhazip.com/' + source_name = '66代理' + + async def run(self): + content = await downLoader.download(self.start_url) + proxies = list() + data = [ + { + "ip": "49.68.68.197", + "port": random.randint(1111, 6000), + "expire_time": "2019-05-24 08:58:31", + "city": "南京市", + "isp": "电信" + }, + { + "ip": "58.218.201.108", + "port": random.randint(1111, 6000), + "expire_time": "2019-05-24 08:55:31", + "city": "上海市", + "isp": "移动", + "outip": "219.136.47.161", + } + ] + # data = content['data'] + for one in data: + proxy = f"{one['ip']}:{one['port']}" + detail = {'ip_id': proxy, 'city': one.get('city', ''), 'isp': one.get('isp', ''), 'verify_num': 0, + 'verify_success_rate': 0, 'verify_success_num': 0, 'verify_error_num': 0, + 'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'verify_time': '', "next_verify_time": "", 'expire_time': one.get('expire_time', ''), + 'proxy_source': self.source_name} + proxies.append(detail) + # 验证新获取的ip,然后入库 + return proxies +# +# +# class KuaidailiSpider(metaclass=SpiderMeta): +# start_url = 'http://www.kuaidaili.com/free/inha/{}/' +# +# async def run(self, page_total=2): +# # urls = [self.start_url.format(i) +# # for i in range(self._counter, self._counter + page_total)] +# # self.increment(page_total) +# ans = [] +# return self.start_url +# +# +# class XiciSpider(metaclass=SpiderMeta): +# start_url = 'http://www.xicidaili.com/nn/{}' +# +# async def run(self, page_total=2): +# # urls = [self.start_url.format(i) +# # for i in range(self._counter, self._counter + page_total)] +# # self.increment(page_total) +# ans = [] +# return self.start_url + + +# spiders = [cls() for cls in SpiderMeta.spiders] +# for spider in spiders: +# print(spider.run()) diff --git a/SimplePool/ipserver/crawler.py b/SimplePool/ipserver/crawler.py new file mode 100644 index 0000000..240721d --- /dev/null +++ b/SimplePool/ipserver/crawler.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/29 10:16 +# @Author:LiuYiJie +# @file: crawler +""" +请求代理,返回ip +""" +import aiohttp +import datetime +import asyncio +from SimplePool.log_code.log import logger + + +class Downloader: + @staticmethod + async def fetch(url): + async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False), trust_env=True) as session: + try: + async with session.request(method='get', url=url, timeout=5) as response: + content = await response.read() + # content = '123456' + print(content) + except Exception as e: + logger.error(f'{url}请求出错, {e}') + content = b"{'status': 1000, 'data': []}" + finally: + return content + + async def download(self, url): + content = await self.fetch(url) + return content + + +downLoader = Downloader() diff --git a/SimplePool/ipserver/getip.py b/SimplePool/ipserver/getip.py new file mode 100644 index 0000000..dfc6411 --- /dev/null +++ b/SimplePool/ipserver/getip.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/20 13:58 +# @Author:LiuYiJie +# @file: getip +""" +请求ip,存入redis +""" +import aiohttp +import asyncio +import datetime +import time +import json +from SimplePool.log_code.log import logger +from SimplePool.db.ipdb import ProxyMongo +import SimplePool.setting as setting +from SimplePool.ipserver.agent_store import SpiderMeta + + +class GetIp: + def __init__(self): + self._db = ProxyMongo() + self.proxy_num = 0 + self.proxy_num_min = setting.PROXY_NUMBER_MIN + + async def ip_count(self): + """ + 判断库里的代理数量是否达到最小值,90分数以上 + :return: + """ + condition = {"verify_success_rate": {"$gte": 90}} + return True if await self._db.count_proxies(table=setting.MONGODB_COLL, + condition=condition) > self.proxy_num_min else False + + async def run(self): + # 每次请求ip时,进行初始化验证 + logger.info(f'开始请求获取ip.....') + # 获取当前所有代理商 + spiders = [cls() for cls in SpiderMeta.spiders] + while True: + available_count = await self.ip_count() + if available_count: + logger.info(f'当前库里分数90以上大于40,本次获取{self.proxy_num}条代理') + break + time.sleep(0.5) + tasks = [] + for spider in spiders: + task = asyncio.create_task(spider.run()) + tasks.append(task) + results = await asyncio.gather(*tasks) + for result in results: + affect_count, inserted_ids, up_count = await self._db.add_proxy(table=setting.MONGODB_COLL, values=result) + self.proxy_num += affect_count diff --git a/SimplePool/ipserver/ipapi.py b/SimplePool/ipserver/ipapi.py new file mode 100644 index 0000000..16752a8 --- /dev/null +++ b/SimplePool/ipserver/ipapi.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/20 13:36 +# @Author:LiuYiJie +# @file: ipapi +import uvicorn +import base64 +from fastapi import FastAPI, Request, Query +from SimplePool.db.ipdb import ProxyMongo +import SimplePool.setting as setting + + +app = FastAPI() +app.debug = 'debug' + + +def get_conn(): + _db = ProxyMongo() + return _db + + +@app.get('/') +async def index(): + return "Welcome to SimplePool" + + +@app.get('/getIp/v1/') +async def get_ip(num: int = 0): + db = get_conn() + display_name = {'_id': 1} + res = await db.get_proxies(setting.MONGODB_COLL, display_name=display_name, limit=num) + res_ip = [ip['_id'] for ip in res] + return res_ip + + +@app.get('/getAreaIp/v1/') +async def get_area(place: str = Query(...), num: int = 0): + db = get_conn() + condition = {'city': {"$regex": place}} + display_name = {'_id': 1} + res = await db.get_proxies(setting.MONGODB_COLL, condition=condition, display_name=display_name, limit=num) + res_ip = [ip['_id'] for ip in res] + return res_ip + + +if __name__ == '__main__': + uvicorn.run('ipapi:app', host='0.0.0.0', port=8080) diff --git a/SimplePool/ipserver/scheduler.py b/SimplePool/ipserver/scheduler.py new file mode 100644 index 0000000..86bbcc6 --- /dev/null +++ b/SimplePool/ipserver/scheduler.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/27 16:53 +# @Author:LiuYiJie +# @file: scheduler +""" +调度,定时执行任务 +""" + +import asyncio +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from SimplePool.ipserver.getip import GetIp +from SimplePool.ipserver.verifyip import VerifyProxy +from SimplePool.log_code.log import logger + + +def configure_scheduler(): + scheduler = AsyncIOScheduler() + scheduler.add_job(run_VerifyProxy, 'interval', seconds=22, misfire_grace_time=1) + scheduler.add_job(run_GetIp, 'interval', seconds=11, misfire_grace_time=2) + return scheduler + + +async def run_VerifyProxy(): + await VerifyProxy().run() + + +async def run_GetIp(): + await GetIp().run() + + +def start_scheduler(scheduler): + scheduler.start() + logger.info('Scheduler started') + + +async def run_forever(): + scheduler = configure_scheduler() + start_scheduler(scheduler) + try: + await asyncio.Event().wait() + except (KeyboardInterrupt, SystemExit): + logger.info('Stopping Scheduler...') + scheduler.shutdown() + + +def run_task(): + logger.info('Starting Scheduled Tasks') + asyncio.run(run_forever()) diff --git a/SimplePool/ipserver/verifyip.py b/SimplePool/ipserver/verifyip.py new file mode 100644 index 0000000..2bcd658 --- /dev/null +++ b/SimplePool/ipserver/verifyip.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/20 16:12 +# @Author:LiuYiJie +# @file: verifyip +import aiohttp +import asyncio +from datetime import datetime, timedelta +from SimplePool.db.ipdb import ProxyMongo +import SimplePool.setting as setting +from SimplePool.log_code.log import logger +from typing import Union, List + + +class VerifyProxy: + def __init__(self): + self._db = ProxyMongo() + self.concurrency = 10 + self.verify_counts = 0 + self.verify_success_counts = 0 + self.semaphore = asyncio.Semaphore(value=self.concurrency) + + async def fetch(self, proxies: dict): + async with self.semaphore: + proxy = proxies['ip_id'] + async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False), trust_env=True) as session: + try: + async with session.request('get', url=setting.VERIFY_BASE_URL, proxy='', timeout=3) as response: + res = await response.json() + ress = response.status + res = {'status': 200, 'host': '49.68.68.197', 'port': ''} + except Exception as e: + logger.error(f'验证代理{proxy}失败, 失败原因为{e}') + res = {'status': 400, 'host': '', 'port': ''} + finally: + return await self.parse(proxies, res) + + @staticmethod + async def change_proxy_message(proxies, success: bool = True) -> dict: + now_time = datetime.now() + if success: + proxies['verify_num'] = proxies['verify_num'] + 1 + proxies['verify_success_num'] = proxies['verify_success_num'] + 1 + proxies['verify_success_rate'] = float((proxies['verify_success_num'] / proxies['verify_num']) * 100) + proxies['verify_time'] = now_time.strftime('%Y-%m-%d %H:%M:%S') + proxies['next_verify_time'] = (now_time + timedelta(seconds=30)).strftime('%Y-%m-%d %H:%M:%S') + return proxies + else: + proxies['verify_num'] = proxies['verify_num'] + 1 + proxies['verify_error_num'] = proxies['verify_error_num'] + 1 + proxies['verify_success_rate'] = float((proxies['verify_success_num'] / proxies['verify_num']) * 100) + proxies['verify_time'] = now_time.strftime('%Y-%m-%d %H:%M:%S') + proxies['next_verify_time'] = (now_time + timedelta(seconds=30)).strftime('%Y-%m-%d %H:%M:%S') + return proxies + + async def parse(self, proxies, response): + # 请求成功不代理ip可用,还需判断ip是否相同,ip相同验证成功 + if response.get('status', '') == 200: + if response.get('host') in proxies['ip_id']: + # 验证成功,更新次数 + ver_proxies = await self.change_proxy_message(proxies, success=True) + res_score = await self._db.update_proxy(table=setting.MONGODB_COLL, value=ver_proxies) + logger.info(f'{proxies}验证成功') + self.verify_counts += 1 + self.verify_success_counts += 1 + return res_score + else: + # 验证失败,更新次数 + ver_proxies = await self.change_proxy_message(proxies, success=False) + res_score = await self._db.update_proxy(table=setting.MONGODB_COLL, value=ver_proxies) + logger.info(f'{proxies}验证error') + self.verify_counts += 1 + return res_score + else: + # 验证失败,更新次数 + ver_proxies = await self.change_proxy_message(proxies, success=False) + res_score = await self._db.update_proxy(table=setting.MONGODB_COLL, value=ver_proxies) + logger.info(f'{proxies}验证error') + self.verify_counts += 1 + return res_score + + async def init_run(self, proxies: Union[List[dict], dict]): + """ + 初始化获取代理进行验证,单个代理进行验证,不从库里取代理 + :return: + """ + if isinstance(proxies, list): + tasks = [] + for proxy in proxies: + task = asyncio.create_task(self.fetch(proxy)) + tasks.append(task) + await asyncio.gather(*tasks) + else: + await self.fetch(proxies) + + async def deal_over_proxy(self): + """ + 删除过期时间的代理 + :return: + """ + now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + condition = {"expire_time": {"$lte": now_time}} + count = await self._db.delete_proxies(table=setting.MONGODB_COLL, condition=condition) + return count + + async def run(self): + """ + # 根据验证时间,验证所有代理 + :return: + """ + logger.info('start verify proxy......') + now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + # 库里小于当前时间的代理,才开始验证 + condition = {"next_verify_time": {"$lte": now_time}} + verify_proxies = await self._db.get_all_proxies(table=setting.MONGODB_COLL, condition=condition) + tasks = [] + for proxy in verify_proxies: + task = asyncio.create_task(self.fetch(proxy)) + tasks.append(task) + await asyncio.gather(*tasks) + delete_count = await self.deal_over_proxy() + logger.info(f'本次验证{self.verify_counts}条代理, 成功{self.verify_success_counts}条, 本次删除{delete_count}条过期代理') \ No newline at end of file diff --git a/SimplePool/log_code/__init__.py b/SimplePool/log_code/__init__.py new file mode 100644 index 0000000..38d0b52 --- /dev/null +++ b/SimplePool/log_code/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/19 13:55 +# @Author:LiuYiJie +# @file: __init__.py diff --git a/SimplePool/log_code/__pycache__/__init__.cpython-38.pyc b/SimplePool/log_code/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..4c2fe6d Binary files /dev/null and b/SimplePool/log_code/__pycache__/__init__.cpython-38.pyc differ diff --git a/SimplePool/log_code/__pycache__/log.cpython-38.pyc b/SimplePool/log_code/__pycache__/log.cpython-38.pyc new file mode 100644 index 0000000..2364254 Binary files /dev/null and b/SimplePool/log_code/__pycache__/log.cpython-38.pyc differ diff --git a/SimplePool/log_code/log.py b/SimplePool/log_code/log.py new file mode 100644 index 0000000..1d06cf3 --- /dev/null +++ b/SimplePool/log_code/log.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/19 13:55 +# @Author:LiuYiJie +# @file: log +""" +设置日志 +""" +import os +from os.path import dirname, abspath, join +import logging +from logging import handlers +import SimplePool.setting as setting +import datetime + + +class ColoredFormatter(logging.Formatter): + COLORS = { + "black": "\033[40m", # 黑色 + "red": "\033[91m", # 红色 + "green": "\033[92m", # 绿色 + "yellow": "\033[93m", # 黄色 + "blue": "\033[34m", # 蓝色 + "purple": "\033[95m", # 紫色 + "dgreen": "\033[96m", # 深绿 + "white": "\033[97m", # 白色 + "reset": '\033[0m', # 默认 + } + + DEFAULT_STYLES = { + "spam": COLORS['green'], + "DEBUG": COLORS['blue'], + "verbose": COLORS['blue'], + "INFO": COLORS['white'], + "WARNING": COLORS['yellow'], + "success": COLORS['green'], + "ERROR": COLORS['red'], + "CRITICAL": COLORS['red'], + "EXCEPTION": COLORS['red'], + + "asctime": COLORS['green'], + "message": COLORS['green'], + "lineno": COLORS['purple'], + "threadName": COLORS['red'], + "module": COLORS['red'], + "levelname": COLORS['white'], + "name": COLORS['blue'], + "default": COLORS['blue'], + } + + def __init__(self, styles=None): + super().__init__() + self.styles = styles or self.DEFAULT_STYLES + + def set_color(self, levelname: str = None): + return self.styles.get(levelname, "reset") + + def format(self, record): + levelname = record.levelname + asctime = f"{self.styles.get('default')}{datetime.datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S')}{self.COLORS['reset']}" + threadName = f"{self.styles.get('default')}{record.threadName}{self.COLORS['reset']}" + pathname = f"{self.styles.get('default')}{record.pathname}{self.COLORS['reset']}" + lineno = f"{self.styles.get('default')}{record.lineno}{self.COLORS['reset']}" + funcName = f"{self.styles.get('default')}{record.funcName}{self.COLORS['reset']}" + module = f"{self.styles.get('default')}{record.module}{self.COLORS['reset']}" + message = super().format(record) + + levelcolor = self.set_color(levelname) + levelname = f"{levelcolor}{levelname}{self.COLORS['reset']}" + message = f"{levelcolor}{message}{self.COLORS['reset']}" + + formatted_message = f"{threadName} - {asctime} - {levelname} - {pathname} - {module}:{funcName}:{lineno} - {message}" + return formatted_message + + +class ColoredConsoleHandler(logging.StreamHandler): + def __init__(self, formatter=None): + super().__init__() + self.formatter = formatter or ColoredFormatter() + + +def setup_logger(): + log_obj = logging.getLogger() + log_obj.setLevel(logging.DEBUG) + + # 控制台输出 + console_handler = ColoredConsoleHandler() + log_obj.addHandler(console_handler) + + log_file = join(setting.LOG_DIR, 'log_file.log') + + # 文件输出,每天一个文件 + file_handler = handlers.TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=7, + encoding='utf-8') + file_handler.suffix = "%Y-%m-%d.log" + file_handler.setFormatter(logging.Formatter( + '%(threadName)-10s - %(asctime)s - %(module)s - %(funcName)s:%(lineno)d - %(levelname)s - %(message)s')) + log_obj.addHandler(file_handler) + + return log_obj + + +logger = setup_logger() + diff --git a/SimplePool/logs/log_file.log b/SimplePool/logs/log_file.log new file mode 100644 index 0000000..c88f473 --- /dev/null +++ b/SimplePool/logs/log_file.log @@ -0,0 +1 @@ +MainThread - 2024-01-19 15:50:45,913 - proactor_events - __init__:623 - DEBUG - Using proactor: IocpProactor diff --git a/SimplePool/setting.py b/SimplePool/setting.py new file mode 100644 index 0000000..80b3298 --- /dev/null +++ b/SimplePool/setting.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/18 17:01 +# @Author:LiuYiJie +# @file: SETTING +""" +配置文件 +""" +import os +from os.path import dirname, abspath, join +from environs import Env + +env = Env() +env.read_env() + +ROOT_DIR = dirname(dirname(abspath(__file__))) +LOG_DIR = join(ROOT_DIR, env.str('LOG_DIR', 'logs')) +os.makedirs(LOG_DIR, exist_ok=True) + +# redis配置 +REDIS_HOST = env.str("REDIS_HOST", env.str('REDIS_HOST', '127.0.0.1')) +REDIS_PORT = env.int('REDIS_PORT', env.int('REDIS_PORT', 6379)) +REDIS_PASSWORD = env.str('REDIS_PASSWORD', env.str('REDIS_PASSWORD', None)) +REDIS_DB = env.int('REDIS_DB', env.int('REDIS_DB', 0)) + +# MongoDB配置 +MONGODB_HOST = '127.0.0.1' +MONGODB_PORT = 27017 +MONGODB_DB = 'IPS' +MONGODB_COLL = 'ip_proxies' +MONGODB_USERNAME = None +MONGODB_PASSWORD = None +MONGODB_URL = None # 如果需要使用密码, 需要使用url: mongodb://username:password@host:port + + +# proxy detection num, 代理i的分数 +PROXY_DETECTION_MAX = 100 +PROXY_DETECTION_MIN = 90 +PROXY_DETECTION_INIT = 99 + +# proxy exist num 代理ip的数量 +PROXY_NUMBER_MAX = 100 +PROXY_NUMBER_MIN = 50 + +# api +API_HOST = env.str('API_HOST', '0.0.0.0') +API_PORT = env.int('API_PORT', 8080) +API_THREADED = env.bool('API_THREADED', True) + + +# 验证代理地址 +VERIFY_BASE_URL = 'https://icanhazip.com/' +# VERIFY_BASE_URL = 'http://127.0.0.1:8000' diff --git a/SimpleServer/__init__.py b/SimpleServer/__init__.py new file mode 100644 index 0000000..784c910 --- /dev/null +++ b/SimpleServer/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# @date:2024/1/19 16:13 +# @Author:LiuYiJie +# @file: __init__.py diff --git a/SimpleServer/__pycache__/verify_server.cpython-38.pyc b/SimpleServer/__pycache__/verify_server.cpython-38.pyc new file mode 100644 index 0000000..744033b Binary files /dev/null and b/SimpleServer/__pycache__/verify_server.cpython-38.pyc differ diff --git a/SimpleServer/verify_server.py b/SimpleServer/verify_server.py new file mode 100644 index 0000000..c4a263b --- /dev/null +++ b/SimpleServer/verify_server.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# @date:2024/1/19 16:13 +# @Author:LiuYiJie +# @file: verify_server +import uvicorn +from fastapi import FastAPI, Request + + +app = FastAPI() +app.debug = 'debug' + + +@app.get('/get/') +async def index(request: Request): + host = request.client.host + print(request.client) + return {"status": 200, "host": host, "success": True} + + +if __name__ == '__main__': + uvicorn.run('verify_server:app', host='0.0.0.0', port=8080) \ No newline at end of file diff --git a/Test/__init__.py b/Test/__init__.py new file mode 100644 index 0000000..ce54079 --- /dev/null +++ b/Test/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# @date:2023/10/17 10:01 +# @Author:LiuYiJie +# @file: __init__.py diff --git a/Test/__pycache__/main.cpython-38.pyc b/Test/__pycache__/main.cpython-38.pyc new file mode 100644 index 0000000..a641895 Binary files /dev/null and b/Test/__pycache__/main.cpython-38.pyc differ diff --git a/Test/log.py b/Test/log.py new file mode 100644 index 0000000..c80162e --- /dev/null +++ b/Test/log.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/19 17:02 +# @Author:LiuYiJie +# @file: log + +from nb_log import get_logger + +# logger = get_logger('lalala',) # get_logger 只有一个name是必传递的,其他的参数不是必传。 +# logger = get_logger('lalala',log_filename='lalala.log',formatter_template=5,log_file_handler_type=2) # get_logger有很多其他入参可以自由定制logger。 + + +# logger.debug(f'debug是绿色,说明是调试的,代码ok ') +# logger.info('info是天蓝色,日志正常 ') +# logger.warning('黄色yello,有警告了 ') +# logger.error('粉红色说明代码有错误 ') +# logger.critical('血红色,说明发生了严重错误 ') + +# print('导入nb_log之后的print是强化版的可点击跳转的') +# +# logger = get_logger('logger_namespace', +# log_filename='namespace_file.log', +# error_log_filename='f4b_error.log') +# logger.debug('这条日志会写到文件中') +# logger.error('这条日志会写到普通文件中,同时会单独写入到错误文件中') +# +# import sys +# print(sys.path[1]) \ No newline at end of file diff --git a/Test/main.py b/Test/main.py new file mode 100644 index 0000000..b1a1bd1 --- /dev/null +++ b/Test/main.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# @date:2023/10/17 10:01 +# @Author:LiuYiJie +# @file: main +from typing import Union +import uvicorn +from fastapi import FastAPI, Request +from pydantic import BaseModel + +app = FastAPI() + + +@app.get('/') +async def test(request: Request): + print(request.client.host) + return {"host": request.client.host, 'port': request.client.port} + + +@app.get("/items/{item_id}") +async def read_item(item_id: int, q: Union[str, None] = None): + return {"item_id": item_id, "q": f"接口id:{q}"} + + +if __name__ == "__main__": + uvicorn.run(app, host="127.0.0.1", port=8000) diff --git a/Test/test_getip.py b/Test/test_getip.py new file mode 100644 index 0000000..6ab7f77 --- /dev/null +++ b/Test/test_getip.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/20 16:17 +# @Author:LiuYiJie +# @file: test_getip +import requests + + + +url = 'http://127.0.0.1:8080/get_ip/1' +res = requests.get(url) +print(res.text) \ No newline at end of file diff --git a/Test/test_redis.py b/Test/test_redis.py new file mode 100644 index 0000000..819d6bc --- /dev/null +++ b/Test/test_redis.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/20 11:28 +# @Author:LiuYiJie +# @file: test_redis +import asyncio +import json + +from SimplePool.db.ipdb import ProxyMongo +from SimplePool.log_code.log import logger +import datetime + + +async def test(): + c = ProxyMongo() + proxies = list() + + response = { + "code": 0, + "success": True, + "msg": "0", + "data": [ + { + "ip": "49.68.68.197", + "port": 6666, + "expire_time": "2019-05-24 08:58:31", + "city": "徐州市", + "isp": "电信" + }, + { + "ip": "58.218.201.108", + "port": 6666, + "expire_time": "2019-05-24 08:55:31", + "city": "苏州市", + "isp": "电信", + "outip": "219.136.47.161", + } + ] + } + data = response['data'] + for one in data: + proxy = f"{one['ip']}:{one['port']}" + detail = {'_id': proxy, 'city': one.get('city', ''), 'isp': one.get('isp', ''), 'verify_num': 0, + 'verify_success_rate': 0, 'verify_success_num': 0,'verify_error_num': 0, + 'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'update_time': '', + 'expire_time': one.get('expire_time', '')} + res = await c.update_proxy('IPss', detail) + # proxies.append(detail) + # print(proxies) + + # res = await c.add_proxy('IPss', proxies) + + + # res = await c.increase_proxy_score('IPss', ['4444','5555','6666']) + # res = await c.update_batch('IPss', condition={'_id': '58.218.201.108:55556'}, datas=proxies[0]) + # res = await c.clear_proxies('IPss') + # res = await c.count_score_proxies('IPss',score_min=96, score_max=96) + # res = await c.get_proxies('IPss', limit=1) + print(res) + + +# +loop = asyncio.get_event_loop() +loop.run_until_complete(test()) + +import aioredis # 导入redis模块,通过python操作redis 也可以直接在redis主机的服务端操作缓存数据库 + +# r = aioredis.StrictRedis(host='localhost', port=6379, decode_responses=True) # host是redis主机,需要redis服务端和客户端都启动 redis默认端口是6379 + +# data = {'city': '苏州市', 'isp': '电信', 'create_time': '2023-12-28 11:28:36', 'expire_time': '2019-05-24 08:55:31'} +# data = {'58.218.201.108:2690': {'city': '苏州市', 'isp': '电信', 'create_time': '2023-12-28 11:28:36', 'expire_time': '2019-05-24 08:55:31'}} +# data_str = json.dumps(data) + + +# await r.hset('data', '58.218.201.108:2690', data_str) +# print(r.hgetall('data').keys()) +# print(type(r.hgetall('data').keys())) +# r.set('name', 'junxi') # key是"foo" value是"bar" 将键值对存入redis缓存 +# print(r['name']) +# print(r.get('name')) # 取出键name对应的值 +# print(type(r.get('name'))) diff --git a/Test/test_spider.py b/Test/test_spider.py new file mode 100644 index 0000000..19f957c --- /dev/null +++ b/Test/test_spider.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# @date:2023/12/29 8:58 +# @Author:LiuYiJie +# @file: test_spider +"""爬虫模块,包含`SpiderMeta`类和一些初始的 +爬虫类,如果用户需要定义自己的爬虫类,必须要继承 +`SpiderMeta`类,并重写`gets`方法,`gets` +方法要求返回 ip:port 形式的代理。 +""" + +import time + + +class SpiderMeta(type): + spiders = [] + + def _init(cls): + """子类的构造方法 + :return: None + """ + cls._counter = 1 + + def _increment(cls, count): + """子类用于增加计数器的方法 + :param count: 计数器增加量 + :return: None + """ + cls._counter += count + + def _flush(cls): + """计数器刷新为 1 + :return: None + """ + cls._counter = 1 + + def __new__(cls, *args, **kwargs): + """构造子类 + :param args: args[0] = name, args[1] = bases, args[2] = attrs. + :param kwargs: No. + :return: 新类 + """ + + # 爬虫类必须要有 `get` 方法。 + if 'gets' not in args[2]: + raise ValueError(args[0]) + + # 给爬虫类添加一些默认方法 + args[2]['__init__'] = lambda self: SpiderMeta._init(self) + args[2]['increment'] = lambda self, count: SpiderMeta._increment(self, count) + args[2]['flush'] = lambda self: SpiderMeta._flush(self) + + # 将爬虫类加入到 `spiders` 列表中 + SpiderMeta.spiders.append(type.__new__(cls, *args, **kwargs)) + return type.__new__(cls, *args, **kwargs) + + +class Proxy360Spider(metaclass=SpiderMeta): + start_url = 'http://www.proxy360.cn/default.aspx' + + def gets(self, page_total=None): + ans = [] + soup = get_page(self.start_url) + for proxy in soup.find_all('div', {'class': 'proxylistitem'}): + item = proxy.find_all('span', {"class": "tbBottomLine"}) + ip = item[0].get_text().replace('\r\n', '').replace(' ', '') + port = item[1].get_text().replace('\r\n', '').replace(' ', '') + ans.append(':'.join([ip, port])) + return ans + + +class Daili666Spider(metaclass=SpiderMeta): + start_url = 'http://www.66ip.cn/{}.html' + + def gets(self, page_total=3): + urls = [self.start_url.format(i) + for i in range(self._counter, self._counter + page_total)] + self.increment(page_total) + ans = [] + for url in urls: + soup = get_page(url) + # 防止被 Ban, 加 1s 的间隔。 + time.sleep(1) + proxy_list = soup.find('table', {"border": "2px"}) + for proxy in proxy_list.find_all('tr')[1:]: + ip = proxy.find_all('td')[0].get_text() + port = proxy.find_all('td')[1].get_text() + ans.append(':'.join([ip, port])) + return ans + + +class KuaidailiSpider(metaclass=SpiderMeta): + start_url = 'http://www.kuaidaili.com/free/inha/{}/' + + def gets(self, page_total=2): + urls = [self.start_url.format(i) + for i in range(self._counter, self._counter + page_total)] + self.increment(page_total) + ans = [] + for url in urls: + soup = get_page(url) + time.sleep(1) + proxy_list = soup.find('table', + {'class': 'table table-bordered table-striped'}) \ + .find('tbody') + for proxy in proxy_list.find_all('tr'): + tmp = proxy.find_all('td') + ip = tmp[0].get_text() + port = tmp[1].get_text() + ans.append(':'.join([ip, port])) + return ans + + +class XiciSpider(metaclass=SpiderMeta): + start_url = 'http://www.xicidaili.com/nn/{}' + + def gets(self, page_total=2): + urls = [self.start_url.format(i) + for i in range(self._counter, self._counter + page_total)] + self.increment(page_total) + ans = [] + for url in urls: + soup = get_page(url) + time.sleep(1) + proxy_list = soup.find('table', {'id': 'ip_list'}) \ + .find_all('tr')[1:] + for proxy in proxy_list: + tmp = proxy.find_all('td') + ip = tmp[1].get_text() + port = tmp[2].get_text() + ans.append(':'.join([ip, port])) + return ans + + +from fastapi import FastAPI, Query +from urllib.parse import quote +import uvicorn + +app = FastAPI() + +@app.get("/search/") +def search_items(query_param: str = Query(..., description="Your query parameter")): + # 将查询参数进行 URL 编码 + encoded_query_param = quote(query_param) + + return {"result": f"Searching for {encoded_query_param}"} + + +if __name__ == '__main__': + uvicorn.run('test_spider:app', host='0.0.0.0', port=8080) diff --git a/logs/log_file.log b/logs/log_file.log new file mode 100644 index 0000000..dc25f23 --- /dev/null +++ b/logs/log_file.log @@ -0,0 +1 @@ +MainThread - 2024-01-19 15:42:26,726 - proactor_events - __init__:line:623 - DEBUG - Using proactor: IocpProactor diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..7146704 --- /dev/null +++ b/readme.md @@ -0,0 +1,3 @@ +##### 开始 +三个服务:一个获取ip,一个验证ip,一个存储ip +简单代理池 \ No newline at end of file diff --git a/requirements.yml b/requirements.yml new file mode 100644 index 0000000..30aa601 --- /dev/null +++ b/requirements.yml @@ -0,0 +1,50 @@ +name: IpPool +channels: + - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main + - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r + - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2 + - defaults +dependencies: + - aiohttp=3.9.0=py38h2bbff1b_0 + - aioredis=1.3.1=pyhd3eb1b0_0 + - aiosignal=1.2.0=pyhd3eb1b0_0 + - async-timeout=4.0.3=py38haa95532_0 + - attrs=23.1.0=py38haa95532_0 + - ca-certificates=2023.12.12=haa95532_0 + - click=8.1.7=py38haa95532_0 + - colorama=0.4.6=py38haa95532_0 + - frozenlist=1.4.0=py38h2bbff1b_0 + - greenlet=2.0.1=py38hd77b12b_0 + - h11=0.12.0=pyhd3eb1b0_0 + - hiredis=2.0.0=py38h2bbff1b_0 + - idna=3.4=py38haa95532_0 + - libffi=3.4.4=hd77b12b_0 + - multidict=6.0.4=py38h2bbff1b_0 + - openssl=3.0.12=h2bbff1b_0 + - pip=23.3=py38haa95532_0 + - python=3.8.18=h1aa4202_0 + - redis=3.5.3=pyhd3eb1b0_0 + - setuptools=68.0.0=py38haa95532_0 + - sqlalchemy=2.0.21=py38h2bbff1b_0 + - sqlite=3.41.2=h2bbff1b_0 + - typing=3.10.0.0=py38haa95532_0 + - typing_extensions=4.7.1=py38haa95532_0 + - uvicorn=0.20.0=py38haa95532_0 + - vc=14.2=h21ff451_1 + - vs2015_runtime=14.27.29016=h5e58377_2 + - wheel=0.41.2=py38haa95532_0 + - yarl=1.9.3=py38h2bbff1b_0 + - pip: + - annotated-types==0.6.0 + - anyio==3.7.1 + - dnspython==2.4.2 + - exceptiongroup==1.1.3 + - fastapi==0.104.1 + - motor==3.3.2 + - pydantic==2.4.2 + - pydantic-core==2.10.1 + - pymongo==4.6.1 + - sniffio==1.3.0 + - starlette==0.27.0 + - typing-extensions==4.8.0 +prefix: D:\Conda\envs\IpPool