Amazon页面设计思路

文章—运营技巧,实验试错等的计划,思路,成功案例-筛选全部post的amazon相关文章设置parent为专栏

市场-谷歌趋势,关键词热度,评论关注点

产品-对手排名,销量,关键词变动,价格变动,排名变动,坑位的关联

店铺-店铺绩效放单计算,广告计划变化,活动计划,价格利润

爬虫框架
关键词—>该关键词下的商品坑位

关键词—>该关键词下的BSR排名

关键词—>该关键词下的价格分布

关键词—>该关键词下的销量状况

关键词—>该关键词的搜索热度

关键词—>该关键词的评分状况

BSR排名—>(大)小类排名前n的价格分布

BSR排名—>(大)小类排名前n的关键词词频

BSR排名—>(大)小类排名前n的销量状况

BSR排名—>(大)小类排名前n的评分

ASIN—>该产品的关键词

ASIN—>该产品的排名

ASIN—>该产品的销量

ASIN—>该产品的价格

ASIN—>该产品的评论

ASIN—>该产品的评分

创捷实例方法
scrapy_by_keyword(keyword#关键词)

scrapy_by_bsr(category#类目,type#大小)

scrapy_by_asin(asin#asin)

get_asin(table#通过数据表)

get_keywords(type#标题/5点描述,asin#asin)

get_review(type#1-5星,asin#asin)

get_rate(asin#asin)

get_position(keyword#keyword,asin#asin)

get_volume(asin#asin,bsr#bestsellerrank)

get_price(asin#asin)

# 验证码识别模块
import re
import threading
import time, io
import random
 
from PIL import Image
import pytesseract
 
# import muggle_ocr
import playwright
 
# 配置 tesseract 参数
custom_config = r'--psm 8 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
 
 
# sdk = muggle_ocr.SDK(model_type=muggle_ocr.ModelType.Captcha)
 
def extract_numbers(text):
    """
    从文本中提取数字,同时去除括号及其内容。
    支持的数字格式:
    - 整数: 123, 1,234
    - 小数: 123.45, 1,234.56
 
    Args:
        text (str): 输入文本
 
    Returns:
        list: 提取出的数字列表(转换为float类型)
    """
    # 去除所有类型的括号及其内容
    # 支持处理(), [], {}, ()等中英文括号
    pattern_brackets = r'[\((].*?[\))]|\[.*?\]|\{.*?\}'
    text_without_brackets = re.sub(pattern_brackets, '', text)
 
    # 匹配数字的正则表达式
    # 匹配带千位符的整数和小数
    pattern_numbers = r'(?:[\d,]+\.?\d*|\d*\.?\d+)'
 
    # 提取所有匹配的数字字符串
    number_strings = re.findall(pattern_numbers, text_without_brackets)
 
    # 转换为float类型,处理千位符
    numbers = []
    for num_str in number_strings:
        # 如果不是空字符串
        if num_str:
            # 移除千位符
            num_str = num_str.replace(',', '').replace('.', '')
            try:
                # 转换为int类型
                number = int(num_str)
                numbers.append(number)
            except ValueError:
                continue
 
    return numbers
 
 
# def get_captcha(pic):
#     with open(pic, 'rb') as f:
#         image = f.read()
#         text = sdk.predict(image_bytes=image)
#     return text
 
 
from playwright.sync_api import sync_playwright
 
 
class AmazonScraper:
    def __init__(self):
        self.asin_list = []
        self.category = None
        self.browse_node = None
        self.browser = None
        self.playwright = None
        self.timeout = 15000
        self.country = None
        self.keyword = None
        self.asin = None
        self.end_pages = 5  # 设置爬取的最大页数
        self.delay = 3  # 设置请求的延迟
        self.results = []  # 存储爬取的产品信息
        self.asin_position = None  # 存储指定ASIN的位置
        self.page = None  # 存储浏览器页面对象
        self.start_pages = 1  # 存储页码
        self.country_code = {
            "com": '10000',
            "de": '10115',
            "it": '00020',
        }
 
    def open_browser(self):
        """初始化浏览器和页面"""
        print('打开浏览器')
        try:
            # 保存 playwright 实例
            self.playwright = sync_playwright().start()
 
            # 启动浏览器
            self.browser = self.playwright.chromium.launch(
                args=['--start-maximized']
            )
 
            # 创建上下文
            context = self.browser.new_context(
                viewport={'width': 1280, 'height': 800},
                user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            )
            # 创建页面
            self.page = context.new_page()
            self.page.set_extra_http_headers({
                "Referer": f"https://amazon.{self.country}/"
            })
            print('浏览器已成功打开')
 
        except Exception as e:
            print(f'打开浏览器失败: {str(e)}')
            # 确保资源被清理
            if hasattr(self, 'page') and self.page:
                self.page.close()
            if hasattr(self, 'browser') and self.browser:
                self.browser.close()
            if hasattr(self, 'playwright') and self.playwright:
                self.playwright.stop()
            raise e
 
    def close_browser(self):
        """关闭浏览器并清理资源"""
        try:
            if hasattr(self, 'page') and self.page:
                self.page.close()
            if hasattr(self, 'browser') and self.browser:
                self.browser.close()
            if hasattr(self, 'playwright') and self.playwright:
                self.playwright.stop()
            print('浏览器已关闭')
 
        except Exception as e:
            print(f'关闭浏览器时出错: {str(e)}')
 
    def start_init(self):
        """初始化页面/验证码/cookie协议"""
        print('开始爬取')
        self.open_browser()
        self.browser = self.playwright.chromium.launch(headless=False)  # 设置 headless=False 可见浏览器
        self.page = self.browser.new_page()
        self.page.goto(f"https://www.amazon.{self.country}/")
        self.captcha()
        self.accept()
        self.set_country()
        # self.captcha.start()
        # self.accept.start()
 
    def set_country(self):
        try:
            print('设置国家')
            """设置国家"""
            self.page.wait_for_load_state("load")
            self.page.locator("#nav-global-location-popover-link").click()
            self.page.locator("#GLUXZipUpdateInput").click()
            self.page.locator("#GLUXZipUpdateInput").fill(self.country_code.get(self.country))
            self.page.locator("#GLUXZipUpdateInput").press("Enter")
            time.sleep(3)
            self.page.locator(".a-button-close.a-declarative").press(key="Enter")
            self.page.wait_for_load_state("load")
        except Exception as e:
            print(e)
 
    def captcha(self):
        """验证码"""
        try:
            print('captcha')
            imageb = self.page.locator('form').locator('.a-row.a-text-center').screenshot(path='captcha.png')
            image = Image.open(io.BytesIO(imageb))
            captcha_text = pytesseract.image_to_string(image, config=custom_config)
            print(captcha_text)
            self.page.locator('#captchacharacters').fill(captcha_text)
            self.page.locator('#captchacharacters').press('Enter')
            time.sleep(5)
        except:
            pass
 
    def accept(self):
        """同意cookie协议"""
        try:
            print('cc-accept')
            self.page.locator("#sp-cc-accept").click()
        except:
            pass
 
    def go_to_next_page(self):
        """点击下一页按钮"""
 
        next_button = self.page.locator(".s-pagination-next")
        next_button.click()
        self.page.wait_for_load_state("load")  # 等待页面加载
        # time.sleep(self.delay)
        self.page.screenshot(path='screenshot.png')
        print(self.page.url)
 
    def search_keyword(self):
        """
        输入关键词
        :return: 
        """
        print("输入关键词")
        time.sleep(3)
        self.page.wait_for_load_state("load")  # 等待页面加载
        self.page.locator("#twotabsearchtextbox").press_sequentially(self.keyword)
        time.sleep(3)
        self.page.locator("#nav-search-submit-button").click()
        self.page.wait_for_load_state("load")
 
    def scrape_keyword(self):
        """
        爬取搜索结果页面的逻辑
        :return: 
        """
        self.page.wait_for_load_state("load")
        self.page.wait_for_selector('[data-asin]:not([data-asin=""])')
        products = self.page.locator('[data-asin]:not([data-asin=""])')
        count = products.count()
        # heat = self.page.locator('').inner_text()
        print(count)
        # 控制请求频率
        for index in range(count):
            asin = products.nth(index).get_attribute("data-asin")
            try:
                title = products.nth(index).locator("h2").inner_text()
            except:
                title = ""
            try:
                link = products.nth(index).locator("h2").locator("a").get_attribute("href")
            except:
                link = ""
            try:
                position = (self.start_pages, index + 1)
            except:
                position = ""
            try:
                price = products.nth(index).locator("span.a-price-whole").inner_text().replace("\n", "").replace(
                    "\u202f", "").replace(",", "").replace(".", "")
            except:
                price = ""
            try:
                volume = products.nth(index).locator('span[data-component-type="s-client-side-analytics"]').inner_text()
 
            except Exception as e:
                volume = ""
            try:
                rate = products.nth(index).locator("i[data-cy='reviews-ratings-slot']").inner_text().split(" ")[0]
            except Exception as e:
                rate = ""
            product_info = {
                'asin': asin,
                'title': title,
                'link': f"https://www.amazon.{self.country}{link}",
                'position': position,
                'price': price,
                'volume': volume,
                # 'heat': 'heat',
                'rate': rate,
            }
            print(product_info)
            self.results.append(product_info)
 
    def scrapy_by_keyword(self, keyword, country='com', start_pages=1, end_pages=5):
        """
        根据关键词爬取商品信息
        :param keyword: 搜索框中要输入的关键词
        :param country: 要爬取的国家
        :param start_pages: 开始爬取的页数
        :param end_pages: 结束爬取的页数
        :return: 
        """
        self.keyword = keyword
        self.country = country
        self.start_pages = start_pages
        self.end_pages = end_pages
        self.start_init()
        self.search_keyword()
        now_page = 1
        while now_page <= self.end_pages:
 
            while now_page < self.start_pages:
                print(f'////当前页码{now_page}')
                self.go_to_next_page()
                now_page += 1
 
            time.sleep(self.delay)
            print(f"///////爬取中{now_page}///////")
            self.scrape_keyword()
            self.go_to_next_page()
            now_page += 1
            self.start_pages += 1
        self.close_browser()
        return self.results
 
    def goto_aplus(self, asin):
        """
        前往详情页
        :param asin: 需要前往的商品id
        :return: 
        """
        self.page.goto(f"https://www.amazon.{self.country}/dp/{asin}")
 
    def scrapy_asin(self):
        """
        根据asin爬取
        :return: 
        """
        self.page.wait_for_load_state("load")
        try:
            title = self.page.locator("span#productTitle").inner_text()
        except Exception as e:
            title = ""
            print(e)
        try:
            bullet_list = self.page.locator("div#feature-bullets").locator("span.a-list-item")
            bullet = [bullet_list.nth(i).inner_text() for i in range(bullet_list.count())]
        except Exception as e:
            bullet = ""
            print(e)
        try:
            rank_text = self.page.locator("table.a-keyvalue.prodDetTable").get_by_text("Amazon").locator("..").locator(
                "td").filter(has_text="in").inner_text()
            rank = extract_numbers(rank_text)
        except Exception as e:
            rank = ""
            print(e)
        try:
            volume = extract_numbers(self.page.locator("span#acrCustomerReviewText").nth(0).inner_text())[0]
        except Exception as e:
            volume = ""
            print(e)
        try:
            price = self.page.locator("span.a-price .a-offscreen").nth(0).inner_text().replace("\n", '').replace(
                "\u202f", "")
        except Exception as e:
            price = e
            print(e)
        try:
            rate = \
                self.page.locator("#acrPopover").nth(0).inner_text().replace("\n", '').replace("\u202f", "").split(" ")[
                    0]
        except Exception as e:
            rate = ""
            print(e)
        product_info = {
 
            'title': title,
            'bullet': bullet,
            'rank': rank,
            'volume': volume,
            'price': price,
            'rate': rate,
        }
        self.results.append(product_info)
        print(product_info)
 
    def scrapy_by_asin(self, asin, country='com'):
        """
        根据asin爬取产品详情页的信息
        :param asin: 要爬取的产品id
        :param country: 要爬取的国家后缀美国为com,德国为de,意大利为it...
        :return: 返回结果
        """
        self.country = country
        self.asin = asin
        self.start_init()
        self.goto_aplus(asin=self.asin)
        swatch = self.page.locator(".swatches")
        if swatch.is_visible():
            for i in range(swatch.locator("li").count()):
                print(i + 1, "/", swatch.locator("li").count())
                swatch.locator("li").nth(i).click()
                self.scrapy_asin()
        else:
            self.scrapy_asin()
        self.close_browser()
        return self.results
 
    def goto_bs(self):
        """
        前往排行榜
        :return: 
        """
        self.page.goto(f"https://www.amazon.{self.country}/gp/bestsellers/{self.category}/{self.browse_node}")
 
    def human_like_scroll(self, scroll_height: int = None, max_scrolls: int = None):
        """
        模拟人类般的页面滚动行为
 
        参数:
            page: Playwright page 对象
            scroll_height: 每次滚动的最大像素值 (默认为随机)
            max_scrolls: 最大滚动次数 (默认为None,将持续滚动直到页面底部)
        """
 
        # 获取页面初始高度
        last_height = self.page.evaluate('document.documentElement.scrollHeight')
        scroll_count = 0
 
        while True:
            # 检查是否达到最大滚动次数
            if max_scrolls and scroll_count >= max_scrolls:
                break
 
            # 随机滚动距离 (300-800像素)
            scroll_distance = scroll_height or random.randint(500, 1000)
 
            # 使用 JavaScript 平滑滚动
            self.page.evaluate(f'''
                window.scrollBy({{
                    top: {scroll_distance},
                    behavior: 'smooth'
                }});
            ''')
            self.page.wait_for_load_state("load")
            # 随机暂停 (模拟阅读内容)
            time.sleep(random.uniform(1, 3))
 
            # 有时候随机做一些小幅度上下滚动 (模拟阅读兴趣)
            if random.random() < 0.3:  # 30%的概率
                small_scroll = random.randint(-100, 100)
                self.page.evaluate(f'window.scrollBy(0, {small_scroll})')
                time.sleep(random.uniform(0.5, 1))
            time.sleep(6)
            # 检查是否到达页面底部
            new_height = self.page.evaluate('document.documentElement.scrollHeight')
            # if new_height == last_height:
            #     break
 
            last_height = new_height
            scroll_count += 1
 
    def scrapy_bsr(self):
        """
        爬取逻辑,获取排行榜上的全部asin
        :return: 
        """
        item_list = self.page.locator("#gridItemRoot")
        count = item_list.count()
        for i in range(count):
            asin = item_list.nth(i).locator("div[data-asin]").get_attribute("data-asin")
            self.asin_list.append(asin)
 
        pass
 
    def scrapy_by_bsr(self, country, category, browse_node):
        """
        爬取best seller rank上前100个商品的详情页,先获取前100个商品的asin然后使用scrapy_by_asin逐个爬取
        :param country: 要爬取的国家
        :param category: 要爬取的类目
        :param browse_node: 要爬取的类目编码
        :return: 返回结果
        """
        self.country = country
        self.browse_node = browse_node
        self.category = category
        self.start_init()
        self.goto_bs()
 
        time.sleep(self.delay)
        print(f"///////爬取中{1}///////")
        self.scrapy_bsr()
        self.human_like_scroll(max_scrolls=20)
        # 前往下一页,因为排行榜的下一页按钮与搜索结果页面不一致,这里单独设置点击
        self.page.locator("li.a-last").click()
        self.asin_list.append("1")
        print(f"///////爬取中{2}///////")
        self.page.wait_for_load_state("load")
        self.human_like_scroll(max_scrolls=20)
        self.scrapy_bsr()
        for i in self.asin_list:
            # 这里还有问题
            # self.scrapy_by_asin(asin=i, country=self.country)
            pass
        self.close_browser()
 
        return self.results
 
 
# 这里测试3个爬虫是否可以正常运行
res1 = AmazonScraper().scrapy_by_keyword(keyword='e scooter', country='com', end_pages=2)
print(res1)
res2 = AmazonScraper().scrapy_by_asin(asin="B09PRRW4RL", country="de")
print(res2)
res3 = AmazonScraper().scrapy_by_bsr("it", "sports", "3589076031")
print(res3)
 

123