Amazonやらせレビューチェックの究極版【Python並列処理】

Amazonやらせレビューチェックの究極版【Python並列処理】 プログラミング

「並列処理とは?」に関しては、この記事では説明しません。
説明する内容は、実務的にPythonで並列処理を行なうモノとなります。

また、それを無機質な例で示すのも面白くありません。
そこで、過去に行ってきたスクレイピングを並列で行います。

本記事の内容

  • Pythonで並列処理を行なうため必要なライブラリ・モジュール
  • concurrent.futuresによる並列処理
  • Amazonの商品評価を一気に行う(並列処理)
  • 【サンプルコード】Amazonの商品評価

それでは、上記に沿って解説をしていきます。

Pythonで並列処理を行なうため必要なライブラリ・モジュール

「Python 並列処理」で検索したら、以下の3つが候補に出てきます。

  • threading
  • concurrent.futures
  • Joblib

今回は、concurrent.futuresを利用します。
理由は、以下の2つです。

  • 追加でインストール不要
  • マルチスレッド・マルチプロセスの両方に対応

追加でインストール不要

Python 3.5以上であれば、何もインストールする必要がないからです。
標準でインストールされているということですね。

なお、バージョンは以下のコードで確認可能。

import sys

print(sys.version)

私の環境では、以下のように表示されました。

3.7.3 (default, Mar 27 2019, 17:13:21) [MSC v.1915 64 bit (AMD64)]

マルチスレッド・マルチプロセスの両方に対応

スレッドやプロセスに関しては、ここでは詳細な説明は行いません。
この記事では、あくまで実務的なことを優先します。

concurrent.futures モジュールは、非同期に実行できる呼び出し可能オブジェクトの高水準のインタフェースを提供します。
非同期実行は ThreadPoolExecutor を用いてスレッドで実行することも、 ProcessPoolExecutor を用いて別々のプロセスで実行することもできます. どちらも Executor 抽象クラスで定義された同じインターフェースを実装します。

上記は、concurrent.futuresのドキュメントからの引用です。
concurrent.futuresを用いれば、マルチスレッド・マルチプロセスを意識する必要がないと言えます。

対象となる並列処理によっては、どちらかでしか動かないということもあり得ます。
そのようなリスクに備えて、切り替え可能なconcurrent.futuresは価値があります。

concurrent.futuresによる並列処理

以下のパターンの並列処理を確認します。
基本的には、公式のドキュメントをベースにしています。

  • マルチスレッド(ThreadPoolExecutor)
  • マルチプロセス(ProcessPoolExecutor)

マルチスレッド(ThreadPoolExecutor)

import concurrent.futures
import urllib.request

URLS = ['https://employment.en-japan.com/',
        'https://next.rikunabi.com/',
        'https://tenshoku.mynavi.jp/',
        'https://doda.jp/',
        'https://www.ecareer.ne.jp/']

# 各ページを取得し、URLと内容を報告
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# 並列処理
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r 例外発生: %s' % (url, exc))
        else:
            print('%r ページの容量は %d bytes' % (url, len(data)))

これを2回実行すると以下のような結果になりました。

1回目

'https://doda.jp/' ページの容量は 112338 bytes
'https://employment.en-japan.com/' ページの容量は 72657 bytes
'https://next.rikunabi.com/' ページの容量は 12816 bytes
'https://tenshoku.mynavi.jp/' ページの容量は 191062 bytes
'https://www.ecareer.ne.jp/' ページの容量は 257070 bytes

2回目

'https://doda.jp/' ページの容量は 112338 bytes
'https://employment.en-japan.com/' ページの容量は 72703 bytes
'https://tenshoku.mynavi.jp/' ページの容量は 191062 bytes
'https://next.rikunabi.com/' ページの容量は 12816 bytes
'https://www.ecareer.ne.jp/' ページの容量は 256927 bytes

表示の順番が異なるのは、並列処理した結果の証拠と言えます。
また、逐次処理であれば、URLSに記述した順番になるはずです。

マルチプロセス(ProcessPoolExecutor)

公式のコードそのままです。
これは問題なく動きます。

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

実行した結果は以下。

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False

切り替えの検証

上記の公式のサンプルコードをもとに切り替えの検証をしてみます。

ThreadPoolExecutor ⇒ ProcessPoolExecutor

ダメですね。

'https://employment.en-japan.com/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending.
'https://next.rikunabi.com/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending.
'https://tenshoku.mynavi.jp/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending.
'https://doda.jp/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending.
'https://www.ecareer.ne.jp/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending.

マルチスレッドからマルチプロセスへの切り替えがアウトなのか?
それとも、load_url関数の処理がマルチプロセスに適さないのか?

とりあえず、ここではこの事象は追いかけません。
次の検証に進みます。

ProcessPoolExecutor ⇒ ThreadPoolExecutor

こちらの切り替えは、上手くいきました。

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False

マルチプロセスからマルチスレッドへの切り替えがOKなのか?
is_prime関数の処理がマルチスレッドにも対応できるからか?

まとめ

まあ、検証を結果を見ると疑問は残りますね。
と言っても、実際はどちらかで望む処理が動けばどちらでも問題ありません。

Amazonの商品評価を一気に行う(並列処理)

大体、並列処理のやり方はわかりました。
あとは、実際に動くかどうかですね。

今回の処理を行なう上での懸念は、Seleniumの処理です。
Seleniumに関しては、以下の記事を参考にしてください。

SeleniumはブラウザをPythonから動かす処理です。
それを並列で行えるのか?
これが最大の懸念ということです。

Seleniumによる並列処理の検証

まずは、この懸念を解消するための検証を行います。

import concurrent.futures
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

CHROMEDRIVER = "chromedriver.exeのパス"

URLS = ['https://www.amazon.co.jp/dp/4297113511',
        'https://www.amazon.co.jp/dp/4865940650',
        'https://www.amazon.co.jp/dp/4844336479']

# ASIN取得
def get_asin_from_amazon(url):
     
    asin = ""
    # ヘッドレスモードでブラウザを起動
    options = Options()
    #options.add_argument('--headless')
     
    # ブラウザーを起動
    driver = webdriver.Chrome(CHROMEDRIVER, options=options)
    driver.get(url)
    driver.implicitly_wait(10)  # 見つからないときは、10秒まで待つ
     
    elem_base = driver.find_element_by_id('ASIN')
    if elem_base:
        asin = elem_base.get_attribute("value")
    else:
        print("NG")
         
    # ブラウザ停止
    driver.quit()
     
    return asin

if __name__ == '__main__':
    
    # 並列処理
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        future_to_url = {executor.submit(get_asin_from_amazon, url): url for url in URLS}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r 例外発生: %s' % (url, exc))
            else:
                print(data)

マルチスレッド(ThreadPoolExecutor)のサンプルを改良したモノです。
簡単に仕様を説明します。

URLSにはAmazonの商品ページのURLを3個記述しています。
AmazonのURLはアドレスバーからコピーすると恐ろしく長いモノになります。

この記事上では、便宜上短いURL(ASIN付き)となります。
なお、恐ろしく長いURL(ASINがわからない)を想定してプログラムは組んでいます。
だから、わざわざASINを取得する関数が存在しているのです。

get_asin_from_amazon関数で該当URLのページからASINを取得します。
その際、該当URLへはSeleniumを用いてアクセス・スクレイピングしています。

このコードは、実際に並列処理をしているのか?
これに対しては、実際にコードを動かせばわかると思います。

なんと、ブラウザが3個同時に立ち上がります!!
なかなか、面白いですよ。

ただし、あまり動かしすぎないように注意してください。
Amazonへのアクセスが同時に発生していますので。

ちなみに、このままではウザイです。
そのため、ブラウザを見えないようにするには以下のシャープ(#)を除去します。

#options.add_argument('--headless')

なお、Seleniumに関する処理部分でわからないコードがある場合は、以下の記事をご覧ください。

Amazonの商品評価を一気に行う仕様

  1. AmazonのページへアクセスしてASINを取得
  2. ASINをもとにサクラチェッカーの評価を取得 ⇒ 表示
  3. ASINをもとにレビュー探偵の評価を取得 ⇒ 表示

これらが主な機能となります。
そして、今回は2と3を並列処理します。

別に2⇒3と逐次処理でも問題はありません。
でも、なるべくなら早く処理は完了して欲しいです。
それに並列処理を学ぶには、うってつけの機会でもありますしね。

なお、Amazonにおける評価は取得していません。
そもそも、信用できないモノ扱いです。

比較対象としては取得してもいいのかもしれませんね。
でも、今回はナシでいきます。

チェックしたいAmazonの商品ページのURLが、入力する値です。
その入力に対する結果として、以下の2つのサイトにおける評価を得られます。

  • サクラチェッカー
  • レビュー探偵

それぞれのサイトに対しては、過去にスクレイピング済みです。
各サイトにスクレイピングに関する部分は、記事をご覧ください。

【サンプルコード】Amazonの商品評価

コピペで動くコードです。

今回は、過去のスクレイピングのプログラムをそのまま移植しています。
詳細は、過去記事をご覧ください。

import concurrent.futures
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

URL = "https://www.amazon.co.jp/dp/B07HD71F8N"

SITES = [
    "sakura",
    "tantei"]

CHROMEDRIVER = "chromedriver.exeのパス"

# ASIN取得
def get_asin_from_amazon(url):
     
    asin = ""
    # ヘッドレスモードでブラウザを起動
    options = Options()
    options.add_argument("--headless")
     
    # ブラウザーを起動
    driver = webdriver.Chrome(CHROMEDRIVER, options=options)
    driver.get(url)
    driver.implicitly_wait(10)  # 見つからないときは、10秒まで待つ
     
    elem_base = driver.find_element_by_id('ASIN')
    if elem_base:
        asin = elem_base.get_attribute("value")
    else:
        print("NG")
         
    # ブラウザ停止
    driver.quit()
     
    return asin

# サクラ度分析の値取得
def get_detail_value(elem):
    
    value = elem.find_element_by_tag_name("div").text
    value = value.replace("%", "", 1)
    value = value.strip()
    
    return value

# サクラチェッカーをスクレイピング
def get_sakurachecker(asin):
    
    url = "https://sakura-checker.jp/search/" + asin + "/"
    
    # ヘッドレスモードでブラウザを起動
    options = Options()
    options.add_argument('--headless')
    
    # ブラウザーを起動
    driver = webdriver.Chrome(CHROMEDRIVER, options=options)
    driver.get(url)
    driver.implicitly_wait(10)  # 見つからないときは、10秒まで待つ
    
    
    # 独自の評価
    rating = ""
    try:
        elem_base = driver.find_element_by_class_name("mainBlock")
        elem_rating = elem_base.find_element_by_class_name('item-rating')
        rating = elem_rating.text
        rating = rating.replace("/5", "", 1)
    except:
        traceback.print_exc()     
    
    # サクラ度
    try:
        elem_sakura_num = driver.find_element_by_class_name("sakura-num")
        sakura_num = elem_sakura_num.text
        sakura_num = sakura_num.replace("%", "", 1)
    except:
        traceback.print_exc()   
    
    # サクラ度の詳細分析
    try:
        elem_base = driver.find_element_by_class_name("mainBlock")
        elem_circle = elem_base.find_elements_by_class_name('circlecustom')
      
        price_product = get_detail_value(elem_circle[0])
        shop_area = get_detail_value(elem_circle[1])
        shop_review = get_detail_value(elem_circle[2])
        review_distribution = get_detail_value(elem_circle[3])
        review_date = get_detail_value(elem_circle[4])
        review_reviewer = get_detail_value(elem_circle[5])
    except:
        traceback.print_exc()    
    
    # ブラウザ停止
    driver.quit()
    
    print("【サクラチェッカー】")
    print("独自評価:" + rating)
    print("サクラ度:" + sakura_num)
    print("価格・製品:" + price_product)
    print("ショップ情報・地域:" + shop_area)
    print("ショップレビュー:" + shop_review)
    print("レビュー分布:" + review_distribution)
    print("レビュー日付:" + review_date)
    print("レビュー&amp;レビュアー:" + review_reviewer)
    print("----------------------------------")

# レビュー探偵をスクレイピング
def get_reviewtantei(asin):
    
    url = "https://review-tantei.com/detail/" + asin + "/"
    
    try:
        res = requests.get(url, headers=headers)
        soup = BeautifulSoup(res.content, features='lxml')
        
        # 独自の評価
        div_new_review_rating = soup.find(class_="new_review_rating")
        rating = div_new_review_rating.find("strong").text
        rating = rating.replace("★", "", 1)
        rating = rating.strip()
        
        # レビューの信用度と判定結果
        div_detail_el_trust = soup.find(class_="detail-el-trust")
        
        trust_score = div_detail_el_trust.find(class_="trust_score").find("strong").text
        trust_score = trust_score.replace("%", "", 1)
        trust_score = trust_score.strip()
        
        tmp_trust_grade = div_detail_el_trust.find(class_="trust_grade").find("strong").text
        tmp_trust_grade = tmp_trust_grade.replace("%", "", 1)
        tmp_trust_grade = tmp_trust_grade.strip()
        tmp_trust_grade_list = tmp_trust_grade.split(' ')
        trust_grade = tmp_trust_grade_list[0]
        
        if ('A' in trust_grade):
            trust_grade_2 = 6
        elif ('B' in trust_grade):
            trust_grade_2 = 5
        elif ('C' in trust_grade):
            trust_grade_2 = 4
        elif ('D' in trust_grade):
            trust_grade_2 = 3
        elif ('E' in trust_grade):
            trust_grade_2 = 2
        elif ('F' in trust_grade):
            trust_grade_2 = 1
        elif ('S' in trust_grade):
            trust_grade_2 = 7
        else:
            trust_grade_2 = 0
        
        # レビュー評価詳細
        ul_outer = soup.find(class_="list-group list-group-flush text-left")
        li_list_group_item = ul_outer.find_all(class_="list-group-item")
        
        check = []
        for i in range(len(li_list_group_item)):
            tmp = li_list_group_item[i].find_all(class_="fa")
            
            if len(tmp) == 2:
                # 「× ×」
                check_point = 1
            else:
                target_str = str(tmp[0])


                if ('text-danger' in target_str):
                    # 「×」
                    check_point = 2
                elif ('text-warning' in target_str):
                    # 「!」
                    check_point = 3
                elif ('text-primary' in target_str):
                    # 「○」
                    check_point = 4
                else:
                    check_point = 0
            
            check.append(check_point)
                
        # 最終調査
        analyzed_at = soup.find(class_="analyzed_at").text
        analyzed_at = analyzed_at.replace("最終調査:", "", 1)
        analyzed_at = analyzed_at.strip()

    except:
        print(res)
        traceback.print_exc()         
    
    print("【レビュー探偵】")
    print("独自評価:" + rating)
    print("レビューの信用度:" + trust_score)
    print("判定結果:" + str(trust_grade_2))
    print("レビュー評価詳細:")
    print(check)
    print("最終調査:" + analyzed_at)
    print("----------------------------------")

def get_review(site, asin):

    if site == "sakura":
        get_sakurachecker(asin)
    else:
        get_reviewtantei(asin)

if __name__ == "__main__":
    
    asin = get_asin_from_amazon(URL)
    
    # 並列処理
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        future_to_site = {executor.submit(get_review, site, asin): site for site in SITES}

ポイントは、get_review関数を設けたところです。
この関数の中でget_sakuracheckerとget_reviewtanteiを呼び分けています。
その際の区別は、SITESの値で行う仕様となります。

別の商品を評価する場合は、以下の値を変更します。

URL = "https://www.amazon.co.jp/dp/B07HD71F8N"

このコードの実行結果は、以下となります。

【レビュー探偵】
独自評価:1.6
レビューの信用度:25
判定結果:2
レビュー評価詳細:
[4, 3, 4, 4, 4, 1]
最終調査:2020-05-02 08:43:40
----------------------------------
【サクラチェッカー】
独自評価:1.89
サクラ度:90
価格・製品:95
ショップ情報・地域:95
ショップレビュー:0
レビュー分布:35
レビュー日付:35
レビュー&amp;レビュアー:35
----------------------------------

それぞれ並列でアクセスしています。
レビュー探偵の方が静的コンテンツであるため、先に処理完了となるはずです。
サクラチェッカーはJavaScriptによる動的コンテンツのため、若干待ちが発生します。

ちなみに、Amazonでの該当商品の評価は以下。

レビュー探偵とサクラチェッカーはともに信用できないと評価しています。
つまり、購入してはダメな商品となりますね。

タイトルとURLをコピーしました