德胜云资讯,添加一些关于程序相关的内容,仅供大家学习交流(https://www.wxclwl.com)

网站地图

搜索
德胜云咨询
后端分类 爬虫 网络安全 PHP SQL
热门标签:
最新标签:

免费爬虫ip干货分享,程序员自建代理ip池,轻松爬取数据不封ip没有反爬虫。http爬虫代理一篇读懂,

日期:2023/04/25 00:55作者:李建智人气:

导读:''' 初始化 :param host:地址 :param po...

代理池主要分为4个模块:存储模块、获取模块、检测模块、接口模块无私分享全套Python爬虫干货,如果你也想学习Python,@ 私信小编获取

存储模块

这里我们使用Redis的有序集合,集合的每一个元素都是不重复的。另外,有序集合的每一个元素都有一个分数字段。

具体代码实现如下(ippool_save.py)

MAX_SCORE = 100 #最高分 MIN_SCORE = 0 #最低分 INITIAL_SCORE = 10 #初始分数 REDIS_HOST = localhost REDIS_PORT = 6379 REDIS_PASSWORD = None REDIS_KEY = proxies #键名 import redis from random import choice class PoolEmptyError(): def __str__(self): return PoolEmptyError class RedisClient(object): def __init__(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PASSWORD): 初始化 :param host:地址 :param port: 端口号 :param password: 密码 self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=True) def add(self,proxy,score=INITIAL_SCORE): 添加代理,设置初始分数 :param proxy: 代理 :param score: 分数 :return: 添加结果 if not self.db.zscore(REDIS_KEY,proxy): return self.db.zadd(REDIS_KEY,{proxy:score}) def random(self): 随即获取有效代理,首先尝试获取最高分数代理,如果最高分数不存在,则按照排名获取 :return: result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE) if len(result): return choice(result) else: result = self.db.zrevrange(REDIS_KEY,0,100) if len(result): return choice(result) else: raise PoolEmptyError def decrease(self, proxy): 代理值减一分,分数小于最小值,则代理删除 :param proxy: 代理 :return: 修改后的代理分数 score = self.db.zscore(REDIS_KEY,proxy) if score and score>MIN_SCORE: print("代理",proxy,"当前分数",score,"减1") return self.db.zincrby(REDIS_KEY,-1,proxy) else: print("代理",proxy,"当前分数",score,"移除") return self.db.zrem(REDIS_KEY,proxy) def exists(self,proxy): 判断是否存在 :param proxy: 代理 :return: 是否存在 return not self.db.zscore(REDIS_KEY,proxy) == None def max(self,proxy): 将代理设置为MAX_SCORE :param proxy: 代理 :return: 设置结果 print("代理",proxy,"可用,设置为",MAX_SCORE) return self.db.zadd(REDIS_KEY,{proxy:MAX_SCORE}) def count(self): 获取数量 :return:数量 return self.db.zcard(REDIS_KEY) def all(self): 获取全部代理 :return: 全部代理列表 return self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE)

获取模块

获取模块的逻辑相对简单,首先要定义一个ippool_crawler.py来从各大网站抓取,具体代码如下:

import json import requests from lxml import etree from ippool_save import RedisClient class ProxyMetaclass(type): #参数依次是当前准备创建的类的对象;类的名字;类继承的父类集合;类的方法集合。 def __new__(cls, name,bases,attrs): count = 0 attrs[__CrawlFunc__] = [] for k,v in attrs.items(): if crawl_ in k: attrs[__CrawlFunc__].append(k) count+=1 attrs[__CrawlFuncCount__] = count return type.__new__(cls,name,bases,attrs) class Crawler(object,metaclass=ProxyMetaclass): def __init__(self): self.proxy = RedisClient().random() self.proxies = { http: http:// + self.proxy, https: https:// + self.proxy } def get_proxies(self,callback): proxies = [] for proxy in eval("self.{}()".format(callback)): print(成功获取代理,proxy) proxies.append(proxy) return proxies

我们还需要定义一个Getter类,用来动态地调用所有以crawl开头的方法,然后获取抓取到的代理,将其加入到数据库存储起来,具体代码如下(ippool_getter.py)

from ippool_save import RedisClient from ippool_crawler import Crawler POOL_UPPER_THRESHOLD = 1000 class Getter(): def __init__(self): self.redis = RedisClient() self.crawler = Crawler() def is_over_threshold(self): if self.redis.count() >= POOL_UPPER_THRESHOLD: return True else: return False def run(self): print("获取器开始执行") if not self.is_over_threshold(): for callback_label in range(self.crawler.__CrawlFuncCount__): callback = self.crawler.__CrawlFunc__[callback_label] proxies = self.crawler.get_proxies(callback) for proxy in proxies: self.redis.add(proxy)

检测模块

我们已经将各个网站的代理都抓取下来了现在就需要一个检测模块来对所有代理进行多轮检测。

VALID_STATUS_CODES = [200] TEST_URL = "http://www.baidu.com" BATCH_TEST_SIZE = 100 from ippool_save import RedisClient import aiohttp import asyncio import time class Tester(object): def __init__(self): self.redis = RedisClient() async def test_single_proxy(self,proxy): conn = aiohttp.TCPConnector(verify_ssl=False) async with aiohttp.ClientSession(connector=conn) as session: try: if isinstance(proxy,bytes): proxy = proxy.decode(utf-8) real_proxy = http://+ proxy print("正在测试",proxy) async with session.get(TEST_URL,proxy=real_proxy,timeout=15) as response: if response.status in VALID_STATUS_CODES: self.redis.max(proxy) print(代理可用,proxy) else: self.redis.decrease(proxy) print(请求响应码不合法,proxy) except (TimeoutError,ArithmeticError): self.redis.decrease(proxy) print(代理请求失败,proxy) def run(self): print(测试开始运行) try: proxies = self.redis.all() loop = asyncio.get_event_loop() for i in range(0,len(proxies),BATCH_TEST_SIZE): test_proxies = proxies[i:i+BATCH_TEST_SIZE] tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] loop.run_until_complete(asyncio.wait(tasks)) time.sleep(5) except Exception as e: print(测试器发生错误, e.args)

接口模块

为了更方便地获取可用代理,我们增加了一个接口模块。

使用Flask来实现这个接口模块,实现代码如下(ippool_api.py)

from flask import Flask,g from ippool_save import RedisClient __all__ = [app] app = Flask(__name__) def get_conn(): if not hasattr(g,redis): g.redis = RedisClient() return g.redis @app.route(/) def index(): return <h2>Welcome to Proxy Pool System</h2> @app.route(/random) def get_proxy(): conn = get_conn() return conn.random() @app.route(/count) def get_counts(): conn = get_conn() return str(conn.count()) if __name__ == __main__: app.run()

调度模块

调度模块就是调用以上定义的3个模块,将这3个模块通过多进程的形式运行起来。

最后,只需要调用Scheduler的run()方法即可启动整个代码池。

TESTER_CYCLE = 20 GETTER_CYCLE = 20 TESTER_ENABLED = True GETTER_ENABLED = True API_ENABLED = True from multiprocessing import Process from ippool_api import app from ippool_getter import Getter from ippool_check import Tester import time class Scheduler(): def schedule_tester(self,cycle=TESTER_CYCLE): tester = Tester() while True: print(测试器开始运行) tester.run() time.sleep(cycle) def schedule_getter(self,cycle=GETTER_CYCLE): getter = Getter() while True: print(开始抓取代理) getter.run() time.sleep(cycle) def schedule_api(self): app.run() def run(self): print(代理池开始运行) if TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start() if __name__ == __main__: Scheduler().run()

为了帮助大家更轻松的学好Python,我给大家分享一套Python学习资料,希望对正在学习的你有所帮助!

获取方式:关注并私信小编 “ 学习 ”,即可免费获取!

排行

网站地图

Copyright © 2002-2022 香港德胜云网络 版权所有 | 备案号:蜀ICP备2023007363号-5

声明: 本站内容全部来自互联网,非盈利性网站仅供学习交流