Browse Source

处理网站主页

master
zhangwen 2 weeks ago
parent
commit
49235b7de0
  1. 8
      htmldata_get/.idea/htmldata_get.iml
  2. 118
      htmldata_get/api_main.py
  3. 139
      htmldata_get/main.py
  4. 1
      htmldata_get/res.json
  5. BIN
      htmldata_get/spider.db
  6. 18
      htmldata_get/sql.py
  7. 75
      htmldata_get/t.py
  8. 2
      htmldata_get/t/1.html
  9. 19
      htmldata_get/t/t2.py
  10. 39
      htmldata_get/tools/class_int.py
  11. 138
      htmldata_get/tools/deal_html.py
  12. 19
      htmldata_get/tools/retry.py

8
htmldata_get/.idea/htmldata_get.iml

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

118
htmldata_get/api_main.py

@ -0,0 +1,118 @@
from main import Start
from fastapi import FastAPI
from pydantic import BaseModel
from threading import Thread
import queue
import uuid
import sqlite3
import json
import hashlib
app = FastAPI()
task_queue = queue.Queue()
def get_conn():
return sqlite3.connect("spider.db", check_same_thread=False)
class Spider(BaseModel):
url: str
# 🔁 worker
def worker():
while True:
task_id, url = task_queue.get()
conn = get_conn()
cur = conn.cursor()
try:
data = Start(url).run()
# cur.execute("""
# UPDATE tasks
# SET status=?,
# result=?
# WHERE task_id = ?
# """, (0, json.dumps(data, ensure_ascii=False), task_id))
cur.execute("""
INSERT OR REPLACE INTO tasks (task_id, status, result)
VALUES (?, ?, ?)
""", (task_id, 0, json.dumps(data, ensure_ascii=False)))
except Exception as e:
cur.execute("""
UPDATE tasks
SET status=?,
error=?
WHERE task_id = ?
""", ("error", str(e), task_id))
conn.commit()
conn.close()
task_queue.task_done()
Thread(target=worker, daemon=True).start()
# ✅ 提交任务
@app.post("/crawler/put", summary='提交爬虫')
def put_task(req: Spider):
"""
url:url
"""
task_id = str(hashlib.md5(req.url.encode('utf-8')).hexdigest())
print(task_id)
conn = get_conn()
cur = conn.cursor()
cur.execute("""
INSERT OR REPLACE INTO tasks (task_id, url, status)
VALUES (?, ?, ?)
""", (task_id, req.url, -2))
conn.commit()
conn.close()
task_queue.put((task_id, req.url))
return {'code': 0, 'data': {"task_id": task_id}, 'msg': '操作成功'}
# ✅ 获取结果
@app.get("/crawler/get/{task_id}", summary='获取数据')
def get_result(task_id: str):
"""
task_id: task_id
code -1 -2 0
"""
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT status, result, error FROM tasks WHERE task_id=?", (task_id,))
row = cur.fetchone()
conn.close()
if not row:
return {"code": -1, 'data': task_id, 'msg': '无此task_id'}
status, result, error = row
if status == "error":
return {"code": -1, 'data': task_id, 'msg': '任务处理失败'}
if int(status) == 0:
msg = '操作成功'
else:
msg = '任务正在处理'
return {
"code": int(status),
"data": json.loads(result) if result else None,
"msg": msg
}
# uvicorn api_main:app --host 0.0.0.0 --port 8000
# uvicorn api_main:app --host 0.0.0.0 --port 32000 --log-level debug

139
htmldata_get/main.py

@ -0,0 +1,139 @@
import random
import time
from urllib.parse import urljoin, urlparse
from tools.class_int import RequestsInt
from tools.deal_html import deal_html
from bs4 import BeautifulSoup
import json
class Start(object):
def __init__(self, url):
self.url = url
self.requests = RequestsInt(url)
self.res = []
# ❌ 要过滤的资源
self.exclude_ext = {
".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg",
".mp4", ".avi", ".mov", ".wmv", ".flv", ".mkv",
".mp3", ".wav",
".pdf", ".zip", ".rar", ".7z",
".css", ".js", ".ico", ".woff", ".woff2", ".ttf"
}
# ✅ 判断是否网页
def is_valid_page(self, url):
path = urlparse(url).path.lower()
for ext in self.exclude_ext:
if path.endswith(ext):
return False
return True
def get_internal_links(self, html):
soup = BeautifulSoup(html, "lxml")
base_domain = urlparse(self.url).netloc
links = set()
for a in soup.find_all("a", href=True):
href = a["href"].strip()
# ❌ 无效协议
if href.startswith(("javascript:", "#", "mailto:", "tel:")):
continue
# ✅ 转绝对路径
full_url = urljoin(self.url, href)
# ✅ 同域
if urlparse(full_url).netloc != base_domain:
continue
# ✅ 过滤资源文件
if not self.is_valid_page(full_url):
continue
links.add(full_url)
return list(links)
def deal(self, url, html=None):
if not html:
html = self.requests.get(url)
if html.status_code == 200:
html = html.text
else:
print(f'请求状态异常不处理:{html.status_code}')
return {}
data = deal_html(html, url)
# print(data)
self.res.append(data)
return data
def dealpage(self, url, html):
data = deal_html(html, url)
self.res.append(data)
return data
def _time_sleep(self, k=None):
if not k:
k = random.uniform(0, 1)
print('等待:', k)
time.sleep(k)
def fet_all_links(self, html):
all_links = self.get_internal_links(html)
print("二级页面链接:", all_links)
print('二级页面长度:', len(all_links))
return all_links
def run(self):
html = self.requests.get(self.url).text
res = self.deal(url=self.url, html=html)
print(f'主页获取完成:{res.get("title")}')
all_links = self.fet_all_links(html)
if len(all_links) > 0:
print('静态页面---')
for index, link in enumerate(all_links):
try:
res = self.deal(url=link)
print(f'{link}:{res.get("title")} {index + 1}')
self._time_sleep()
except Exception as e:
print(e)
print(f'异常:{link} {e}')
# print(f'成功获取:{self.res}')
print(f'全部获取完成:{self.url} {len(self.res)}')
return self.res
else:
print('动态页面---')
html = self.requests.get_page(self.url)
print(html)
res = self.dealpage(url=self.url, html=html)
print(f'主页获取完成:{res.get("title")}')
all_links = self.fet_all_links(html)
for index, link in enumerate(all_links):
try:
res = self.deal(url=link)
print(f'{link}:{res.get("title")} {index + 1}')
self._time_sleep()
except Exception as e:
print(e)
print(f'异常:{link} {e}')
# print(f'成功获取:{self.res}')
print(f'全部获取完成:{self.url} {len(self.res)}')
return self.res
class StartPage(Start):
pass
if __name__ == '__main__':
url = 'https://www.essilor.com/cn-zh/'
d = Start(url).run()
with open('res.json', 'w', encoding='utf-8') as f:
f.write(json.dumps(d, ensure_ascii=False))

1
htmldata_get/res.json

@ -0,0 +1 @@
[{"url": "https://www.essilor.com/cn-zh/", "title": "World leader in prescription lenses | Essilor", "keywords": "", "description": "Everyone everywhere should experience the life changing benefits of vision correction and vision protection. Choose your lenses from a committed brand.", "content": "World leader in prescription lenses | Essilor"}, {"url": "https://www.essilor.com/cn-zh/", "title": "World leader in prescription lenses | Essilor", "keywords": "", "description": "Everyone everywhere should experience the life changing benefits of vision correction and vision protection. Choose your lenses from a committed brand.", "content": "World leader in prescription lenses | Essilor"}]

BIN
htmldata_get/spider.db

18
htmldata_get/sql.py

@ -0,0 +1,18 @@
import sqlite3
conn = sqlite3.connect("spider.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
url TEXT,
status TEXT,
result TEXT,
error TEXT,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()

75
htmldata_get/t.py

@ -0,0 +1,75 @@
import requests
from bs4 import BeautifulSoup
import trafilatura
def fetch_html(url):
headers = {"User-Agent": "Mozilla/5.0"}
resp = requests.get(url, headers=headers, timeout=10)
resp.encoding = resp.apparent_encoding
return resp.text
def extract_meta(soup):
title = soup.title.string.strip() if soup.title else ""
def get_meta(name):
tag = soup.find("meta", attrs={"name": name})
if tag and tag.get("content"):
return tag["content"].strip()
return ""
# 兜底 OG
def get_og(prop):
tag = soup.find("meta", attrs={"property": prop})
if tag and tag.get("content"):
return tag["content"].strip()
return ""
keywords = get_meta("keywords")
description = get_meta("description")
if not title:
title = get_og("og:title")
if not description:
description = get_og("og:description")
return title, keywords, description
def extract_text(html):
# ⭐ 优先正文算法
text = trafilatura.extract(html)
if text:
return text
# 🔁 兜底(防止失败)
soup = BeautifulSoup(html, "lxml")
for tag in soup(["script", "style", "noscript"]):
tag.extract()
return soup.get_text(separator="\n")
def parse_page(url):
html = fetch_html(url)
soup = BeautifulSoup(html, "lxml")
title, keywords, description = extract_meta(soup)
content = extract_text(html)
return {
"url": url,
"title": title,
"keywords": keywords,
"description": description,
"content": content
}
if __name__ == "__main__":
url = "https://www.lheia.com/hzhb.html"
data = parse_page(url)
for k, v in data.items():
print(f"{k}:\n{str(v)[:300]}\n")

2
htmldata_get/t/1.html
File diff suppressed because it is too large
View File

19
htmldata_get/t/t2.py

@ -0,0 +1,19 @@
# # 导入
# from DrissionPage import SessionPage
# # 创建页面对象
# page = SessionPage()
# # 访问网页
# page.get('https://www.essilor.com/cn-zh/')
# print(page.html)
# print(page)
# page.close()
# 导入
from DrissionPage import ChromiumPage
# 创建页面对象
page = ChromiumPage()
# 访问网页
page.get('https://www.essilor.com/cn-zh/')
print(page.html)
print(page)
page.close()

39
htmldata_get/tools/class_int.py

@ -0,0 +1,39 @@
import requests
from tools.retry import retry
from DrissionPage import SessionPage
class RequestsInt(object):
def __init__(self, url):
self.ref = url
self.session = requests.Session()
def get_headers(self):
return {
'referer': self.ref,
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36'
}
@retry('get请求', 3)
def get(self, url, params=None, headers=None, timeout=5, **kwargs):
if headers is None:
headers = self.get_headers()
r = self.session.get(url=url, params=params, headers=headers, timeout=timeout, **kwargs)
r.encoding = 'utf-8'
return r
@retry('post请求', 3)
def post(self, url, params=None, headers=None, timeout=5, **kwargs):
if headers is None:
headers = self.get_headers()
r = self.session.post(url=url, params=params, headers=headers, timeout=timeout, **kwargs)
r.encoding = 'utf-8'
return r
def get_page(self, url):
page = SessionPage()
# 访问网页
page.get(url)
page.close()
return page.html

138
htmldata_get/tools/deal_html.py

@ -0,0 +1,138 @@
import re
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
import trafilatura
def extract_meta(soup):
title = soup.title.string if soup.title else ""
def get_meta(name):
tag = soup.find("meta", attrs={"name": name})
if tag and tag.get("content", ''):
return tag["content"].strip()
return ""
# 兜底 OG
def get_og(prop):
tag = soup.find("meta", attrs={"property": prop})
if tag and tag.get("content", ''):
return tag["content"].strip()
return ""
keywords = get_meta("keywords")
description = get_meta("description")
if not title:
title = get_og("og:title")
if not description:
description = get_og("og:description")
return title, keywords, description
def extract_text(html):
# ⭐ 优先正文算法
text = trafilatura.extract(html)
if text:
return text
# 🔁 兜底(防止失败)
soup = BeautifulSoup(html, "lxml")
for tag in soup(["script", "style", "noscript"]):
tag.extract()
return soup.get_text(separator="\n")
def extract_news(html):
soup = BeautifulSoup(html, "lxml")
results = []
for a in soup.find_all("a", href=True):
# 👉 标题
h = a.find(["h1", "h2", "h3", "h4", "h5"])
if not h:
continue
title = h.get_text(strip=True)
# 👉 时间(常见 class:time / date)
time_tag = a.find(lambda tag: tag.name in ["div", "span"] and "time" in (tag.get("class") or []) or "date" in (
tag.get("class") or []))
time_text = time_tag.get_text(strip=True) if time_tag else ""
# ❗过滤无效
if len(title) < 5:
continue
results.append({
"title": title,
"text": time_text,
})
return results
def extract_blocks(html):
soup = BeautifulSoup(html, "lxml")
results = []
# 👉 找所有“可能是内容块”的 a / div
candidates = soup.find_all(["a", "div"])
for tag in candidates:
text = tag.get_text(strip=True)
# ❗过滤太短的(排除噪声)
if len(text) < 10:
continue
# 👉 标题(优先找短文本)
for t in tag.find_all(["h1", "h2", "h3", "p", "span", "div"]):
txt = t.get_text(strip=True)
if len(txt) >=10:
print(txt)
results.append({
"text": txt,
})
break
return results
def extract_chinese_and_english(html):
if not html:
return [], []
# 1. 匹配中文(通用版)
# [\u4e00-\u9fff] 覆盖常用简体/繁体汉字
# \u3400-\u4dbf 覆盖扩展A区(生僻字、古汉字)
# 使用 re.UNICODE 确保编码兼容性
chinese_pattern = re.compile(r'[\u4e00-\u9fff\u3400-\u4dbf]+', re.UNICODE)
chinese_list = chinese_pattern.findall(html)
return chinese_list
def deal_html(html, url):
soup = BeautifulSoup(html, "lxml")
title, keywords, description = extract_meta(soup)
content = extract_text(html)
return {
'url': url,
"title": title,
"keywords": keywords,
"description": description,
"content": content
}
if __name__ == '__main__':
with open('../t/1.html', 'r', encoding='utf-8') as f:
h = f.read()
d = extract_chinese_and_english(h)
print(d)

19
htmldata_get/tools/retry.py

@ -0,0 +1,19 @@
from functools import wraps
def retry(name='name', for_word=3):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for i in range(for_word):
try:
res = func(*args, **kwargs)
return res
except Exception as e:
print(f'[{name}]:{e}')
return False
return wrapper
return decorator
Loading…
Cancel
Save