You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

110 lines
3.5 KiB

# coding=utf-8
import asyncio
import json
import os
from dataclasses import asdict
from datetime import datetime
from playwright.async_api import async_playwright
import config
from abs_spider import AbstractAiSeoSpider
from domain.ai_seo import AiAnswer
from spiders.ai_seo import *
from spiders.ai_seo.wenxiaoyan import WenxiaoyanSpider
from spiders.ai_seo.workflow import WorkFlowApiSpider
from utils.logger_utils import create_logger
from utils.ai import AiSeoApis
logger = create_logger("app")
SPIDER_CLS = {
1: DeepseekSpider,
2: TongyiSpider,
3: YuanBaoSpider,
4: KimiSpider,
5: DouBaoSpider,
6: YiYanSpider,
7: NanometerSpider,
13: MetasoSpider,
8: WenxiaoyanSpider
}
WORKFLOW_PLATFORM_IDS = [2, 5]
async def init_browser() -> tuple:
"""
初始化浏览器实例
:return:
"""
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless=config.BROWSER_HANDLESS,
chromium_sandbox=config.BROWSER_ENABLE_SANDBOX,
ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS,
channel="chrome",
args=config.BROWSER_ARGS)
content = await browser.new_context()
return playwright, browser
def get_spider(platform_id, prompt, brand, browser) -> AbstractAiSeoSpider|WorkFlowApiSpider:
if platform_id in WORKFLOW_PLATFORM_IDS:
return WorkFlowApiSpider(prompt, brand, platform_id)
cls = SPIDER_CLS.get(int(platform_id), None)
if not cls:
raise ValueError(f"未找到对应的爬虫类,platform_id={platform_id}")
return cls(browser, prompt, brand)
def save_local(ai_answer: AiAnswer):
now = datetime.now().strftime("%Y-%m-%d")
base_path = f'./data/{ai_answer.platform_name}/{now}'
if not os.path.exists(base_path):
os.makedirs(base_path)
json_file_path = f'{base_path}/{ai_answer.prompt}.json'
_dict = asdict(ai_answer)
json_str = json.dumps(_dict, indent=4, ensure_ascii=False)
with open(json_file_path, 'w', encoding='utf-8') as f:
f.write(json_str)
logger.info(f"[{ai_answer.platform_name}]{ai_answer.prompt} 保存成功: {base_path}")
async def test():
playwright, browser = await init_browser()
prompts = config.TEST_KEYWORDS
index = 1
for prompt in prompts:
logger.info(f"[{index}/{len(prompts)}] {prompt}")
for platform in config.TEST_PLATFORM:
spider = get_spider(platform, prompt, '品牌词', browser)
ai_answer = await spider.run()
if ai_answer:
save_local(ai_answer)
await asyncio.sleep(config.TEST_INTERVAL)
index = index + 1
await asyncio.sleep(config.TEST_INTERVAL * 6)
async def check_session_by_platform_id(platform_id):
sessions = await AiSeoApis.list_spider_session(platform_id)
playwright, browser = await init_browser()
for session in sessions:
spider = get_spider(session['platform_id'], '你好', '品牌词', browser)
await spider.check_session(session['id'])
async def check_session(platform_ids=None):
if platform_ids is None:
platform_ids = []
logger.info(f"开始检查session {len(platform_ids)}个平台")
for platform_id in platform_ids:
await check_session_by_platform_id(platform_id)
logger.info(f"检查session完成")
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(test())