「並列処理とは?」に関しては、この記事では説明しません。
説明する内容は、実務的にPythonで並列処理を行なうモノとなります。
また、それを無機質な例で示すのも面白くありません。
そこで、過去に行ってきたスクレイピングを並列で行います。
本記事の内容
- Pythonで並列処理を行なうため必要なライブラリ・モジュール
- concurrent.futuresによる並列処理
- Amazonの商品評価を一気に行う(並列処理)
- 【サンプルコード】Amazonの商品評価
それでは、上記に沿って解説をしていきます。
Pythonで並列処理を行なうため必要なライブラリ・モジュール
「Python 並列処理」で検索したら、以下の3つが候補に出てきます。
- threading
- concurrent.futures
- Joblib
今回は、concurrent.futuresを利用します。
理由は、以下の2つです。
- 追加でインストール不要
- マルチスレッド・マルチプロセスの両方に対応
追加でインストール不要
Python 3.5以上であれば、何もインストールする必要がないからです。
標準でインストールされているということですね。
なお、バージョンは以下のコードで確認可能。
import sys print(sys.version)
私の環境では、以下のように表示されました。
3.7.3 (default, Mar 27 2019, 17:13:21) [MSC v.1915 64 bit (AMD64)]
マルチスレッド・マルチプロセスの両方に対応
スレッドやプロセスに関しては、ここでは詳細な説明は行いません。
この記事では、あくまで実務的なことを優先します。
非同期実行は ThreadPoolExecutor を用いてスレッドで実行することも、 ProcessPoolExecutor を用いて別々のプロセスで実行することもできます. どちらも Executor 抽象クラスで定義された同じインターフェースを実装します。
上記は、concurrent.futuresのドキュメントからの引用です。
concurrent.futuresを用いれば、マルチスレッド・マルチプロセスを意識する必要がないと言えます。
対象となる並列処理によっては、どちらかでしか動かないということもあり得ます。
そのようなリスクに備えて、切り替え可能なconcurrent.futuresは価値があります。
concurrent.futuresによる並列処理
以下のパターンの並列処理を確認します。
基本的には、公式のドキュメントをベースにしています。
- マルチスレッド(ThreadPoolExecutor)
- マルチプロセス(ProcessPoolExecutor)
マルチスレッド(ThreadPoolExecutor)
import concurrent.futures import urllib.request URLS = ['https://employment.en-japan.com/', 'https://next.rikunabi.com/', 'https://tenshoku.mynavi.jp/', 'https://doda.jp/', 'https://www.ecareer.ne.jp/'] # 各ページを取得し、URLと内容を報告 def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # 並列処理 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r 例外発生: %s' % (url, exc)) else: print('%r ページの容量は %d bytes' % (url, len(data)))
これを2回実行すると以下のような結果になりました。
1回目
'https://doda.jp/' ページの容量は 112338 bytes 'https://employment.en-japan.com/' ページの容量は 72657 bytes 'https://next.rikunabi.com/' ページの容量は 12816 bytes 'https://tenshoku.mynavi.jp/' ページの容量は 191062 bytes 'https://www.ecareer.ne.jp/' ページの容量は 257070 bytes
2回目
'https://doda.jp/' ページの容量は 112338 bytes 'https://employment.en-japan.com/' ページの容量は 72703 bytes 'https://tenshoku.mynavi.jp/' ページの容量は 191062 bytes 'https://next.rikunabi.com/' ページの容量は 12816 bytes 'https://www.ecareer.ne.jp/' ページの容量は 256927 bytes
表示の順番が異なるのは、並列処理した結果の証拠と言えます。
また、逐次処理であれば、URLSに記述した順番になるはずです。
マルチプロセス(ProcessPoolExecutor)
公式のコードそのままです。
これは問題なく動きます。
import concurrent.futures import math PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) if __name__ == '__main__': main()
実行した結果は以下。
112272535095293 is prime: True 112582705942171 is prime: True 112272535095293 is prime: True 115280095190773 is prime: True 115797848077099 is prime: True 1099726899285419 is prime: False
切り替えの検証
上記の公式のサンプルコードをもとに切り替えの検証をしてみます。
ThreadPoolExecutor ⇒ ProcessPoolExecutor
ダメですね。
'https://employment.en-japan.com/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending. 'https://next.rikunabi.com/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending. 'https://tenshoku.mynavi.jp/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending. 'https://doda.jp/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending. 'https://www.ecareer.ne.jp/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending.
マルチスレッドからマルチプロセスへの切り替えがアウトなのか?
それとも、load_url関数の処理がマルチプロセスに適さないのか?
とりあえず、ここではこの事象は追いかけません。
次の検証に進みます。
ProcessPoolExecutor ⇒ ThreadPoolExecutor
こちらの切り替えは、上手くいきました。
112272535095293 is prime: True 112582705942171 is prime: True 112272535095293 is prime: True 115280095190773 is prime: True 115797848077099 is prime: True 1099726899285419 is prime: False
マルチプロセスからマルチスレッドへの切り替えがOKなのか?
is_prime関数の処理がマルチスレッドにも対応できるからか?
まとめ
まあ、検証を結果を見ると疑問は残りますね。
と言っても、実際はどちらかで望む処理が動けばどちらでも問題ありません。
Amazonの商品評価を一気に行う(並列処理)
大体、並列処理のやり方はわかりました。
あとは、実際に動くかどうかですね。
今回の処理を行なう上での懸念は、Seleniumの処理です。
Seleniumに関しては、以下の記事を参考にしてください。
SeleniumはブラウザをPythonから動かす処理です。
それを並列で行えるのか?
これが最大の懸念ということです。
Seleniumによる並列処理の検証
まずは、この懸念を解消するための検証を行います。
import concurrent.futures from selenium import webdriver from selenium.webdriver.chrome.options import Options CHROMEDRIVER = "chromedriver.exeのパス" URLS = ['https://www.amazon.co.jp/dp/4297113511', 'https://www.amazon.co.jp/dp/4865940650', 'https://www.amazon.co.jp/dp/4844336479'] # ASIN取得 def get_asin_from_amazon(url): asin = "" # ヘッドレスモードでブラウザを起動 options = Options() #options.add_argument('--headless') # ブラウザーを起動 driver = webdriver.Chrome(CHROMEDRIVER, options=options) driver.get(url) driver.implicitly_wait(10) # 見つからないときは、10秒まで待つ elem_base = driver.find_element_by_id('ASIN') if elem_base: asin = elem_base.get_attribute("value") else: print("NG") # ブラウザ停止 driver.quit() return asin if __name__ == '__main__': # 並列処理 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: future_to_url = {executor.submit(get_asin_from_amazon, url): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r 例外発生: %s' % (url, exc)) else: print(data)
マルチスレッド(ThreadPoolExecutor)のサンプルを改良したモノです。
簡単に仕様を説明します。
URLSにはAmazonの商品ページのURLを3個記述しています。
AmazonのURLはアドレスバーからコピーすると恐ろしく長いモノになります。
この記事上では、便宜上短いURL(ASIN付き)となります。
なお、恐ろしく長いURL(ASINがわからない)を想定してプログラムは組んでいます。
だから、わざわざASINを取得する関数が存在しているのです。
get_asin_from_amazon関数で該当URLのページからASINを取得します。
その際、該当URLへはSeleniumを用いてアクセス・スクレイピングしています。
このコードは、実際に並列処理をしているのか?
これに対しては、実際にコードを動かせばわかると思います。
なんと、ブラウザが3個同時に立ち上がります!!
なかなか、面白いですよ。
ただし、あまり動かしすぎないように注意してください。
Amazonへのアクセスが同時に発生していますので。
ちなみに、このままではウザイです。
そのため、ブラウザを見えないようにするには以下のシャープ(#)を除去します。
#options.add_argument('--headless')
なお、Seleniumに関する処理部分でわからないコードがある場合は、以下の記事をご覧ください。
Amazonの商品評価を一気に行う仕様
- AmazonのページへアクセスしてASINを取得
- ASINをもとにサクラチェッカーの評価を取得 ⇒ 表示
- ASINをもとにレビュー探偵の評価を取得 ⇒ 表示
これらが主な機能となります。
そして、今回は2と3を並列処理します。
別に2⇒3と逐次処理でも問題はありません。
でも、なるべくなら早く処理は完了して欲しいです。
それに並列処理を学ぶには、うってつけの機会でもありますしね。
なお、Amazonにおける評価は取得していません。
そもそも、信用できないモノ扱いです。
比較対象としては取得してもいいのかもしれませんね。
でも、今回はナシでいきます。
チェックしたいAmazonの商品ページのURLが、入力する値です。
その入力に対する結果として、以下の2つのサイトにおける評価を得られます。
- サクラチェッカー
- レビュー探偵
それぞれのサイトに対しては、過去にスクレイピング済みです。
各サイトにスクレイピングに関する部分は、記事をご覧ください。
【サンプルコード】Amazonの商品評価
コピペで動くコードです。
今回は、過去のスクレイピングのプログラムをそのまま移植しています。
詳細は、過去記事をご覧ください。
import concurrent.futures from selenium import webdriver from selenium.webdriver.chrome.options import Options URL = "https://www.amazon.co.jp/dp/B07HD71F8N" SITES = [ "sakura", "tantei"] CHROMEDRIVER = "chromedriver.exeのパス" # ASIN取得 def get_asin_from_amazon(url): asin = "" # ヘッドレスモードでブラウザを起動 options = Options() options.add_argument("--headless") # ブラウザーを起動 driver = webdriver.Chrome(CHROMEDRIVER, options=options) driver.get(url) driver.implicitly_wait(10) # 見つからないときは、10秒まで待つ elem_base = driver.find_element_by_id('ASIN') if elem_base: asin = elem_base.get_attribute("value") else: print("NG") # ブラウザ停止 driver.quit() return asin # サクラ度分析の値取得 def get_detail_value(elem): value = elem.find_element_by_tag_name("div").text value = value.replace("%", "", 1) value = value.strip() return value # サクラチェッカーをスクレイピング def get_sakurachecker(asin): url = "https://sakura-checker.jp/search/" + asin + "/" # ヘッドレスモードでブラウザを起動 options = Options() options.add_argument('--headless') # ブラウザーを起動 driver = webdriver.Chrome(CHROMEDRIVER, options=options) driver.get(url) driver.implicitly_wait(10) # 見つからないときは、10秒まで待つ # 独自の評価 rating = "" try: elem_base = driver.find_element_by_class_name("mainBlock") elem_rating = elem_base.find_element_by_class_name('item-rating') rating = elem_rating.text rating = rating.replace("/5", "", 1) except: traceback.print_exc() # サクラ度 try: elem_sakura_num = driver.find_element_by_class_name("sakura-num") sakura_num = elem_sakura_num.text sakura_num = sakura_num.replace("%", "", 1) except: traceback.print_exc() # サクラ度の詳細分析 try: elem_base = driver.find_element_by_class_name("mainBlock") elem_circle = elem_base.find_elements_by_class_name('circlecustom') price_product = get_detail_value(elem_circle[0]) shop_area = get_detail_value(elem_circle[1]) shop_review = get_detail_value(elem_circle[2]) review_distribution = get_detail_value(elem_circle[3]) review_date = get_detail_value(elem_circle[4]) review_reviewer = get_detail_value(elem_circle[5]) except: traceback.print_exc() # ブラウザ停止 driver.quit() print("【サクラチェッカー】") print("独自評価:" + rating) print("サクラ度:" + sakura_num) print("価格・製品:" + price_product) print("ショップ情報・地域:" + shop_area) print("ショップレビュー:" + shop_review) print("レビュー分布:" + review_distribution) print("レビュー日付:" + review_date) print("レビュー&レビュアー:" + review_reviewer) print("----------------------------------") # レビュー探偵をスクレイピング def get_reviewtantei(asin): url = "https://review-tantei.com/detail/" + asin + "/" try: res = requests.get(url, headers=headers) soup = BeautifulSoup(res.content, features='lxml') # 独自の評価 div_new_review_rating = soup.find(class_="new_review_rating") rating = div_new_review_rating.find("strong").text rating = rating.replace("★", "", 1) rating = rating.strip() # レビューの信用度と判定結果 div_detail_el_trust = soup.find(class_="detail-el-trust") trust_score = div_detail_el_trust.find(class_="trust_score").find("strong").text trust_score = trust_score.replace("%", "", 1) trust_score = trust_score.strip() tmp_trust_grade = div_detail_el_trust.find(class_="trust_grade").find("strong").text tmp_trust_grade = tmp_trust_grade.replace("%", "", 1) tmp_trust_grade = tmp_trust_grade.strip() tmp_trust_grade_list = tmp_trust_grade.split(' ') trust_grade = tmp_trust_grade_list[0] if ('A' in trust_grade): trust_grade_2 = 6 elif ('B' in trust_grade): trust_grade_2 = 5 elif ('C' in trust_grade): trust_grade_2 = 4 elif ('D' in trust_grade): trust_grade_2 = 3 elif ('E' in trust_grade): trust_grade_2 = 2 elif ('F' in trust_grade): trust_grade_2 = 1 elif ('S' in trust_grade): trust_grade_2 = 7 else: trust_grade_2 = 0 # レビュー評価詳細 ul_outer = soup.find(class_="list-group list-group-flush text-left") li_list_group_item = ul_outer.find_all(class_="list-group-item") check = [] for i in range(len(li_list_group_item)): tmp = li_list_group_item[i].find_all(class_="fa") if len(tmp) == 2: # 「× ×」 check_point = 1 else: target_str = str(tmp[0]) if ('text-danger' in target_str): # 「×」 check_point = 2 elif ('text-warning' in target_str): # 「!」 check_point = 3 elif ('text-primary' in target_str): # 「○」 check_point = 4 else: check_point = 0 check.append(check_point) # 最終調査 analyzed_at = soup.find(class_="analyzed_at").text analyzed_at = analyzed_at.replace("最終調査:", "", 1) analyzed_at = analyzed_at.strip() except: print(res) traceback.print_exc() print("【レビュー探偵】") print("独自評価:" + rating) print("レビューの信用度:" + trust_score) print("判定結果:" + str(trust_grade_2)) print("レビュー評価詳細:") print(check) print("最終調査:" + analyzed_at) print("----------------------------------") def get_review(site, asin): if site == "sakura": get_sakurachecker(asin) else: get_reviewtantei(asin) if __name__ == "__main__": asin = get_asin_from_amazon(URL) # 並列処理 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: future_to_site = {executor.submit(get_review, site, asin): site for site in SITES}
ポイントは、get_review関数を設けたところです。
この関数の中でget_sakuracheckerとget_reviewtanteiを呼び分けています。
その際の区別は、SITESの値で行う仕様となります。
別の商品を評価する場合は、以下の値を変更します。
URL = "https://www.amazon.co.jp/dp/B07HD71F8N"
このコードの実行結果は、以下となります。
【レビュー探偵】 独自評価:1.6 レビューの信用度:25 判定結果:2 レビュー評価詳細: [4, 3, 4, 4, 4, 1] 最終調査:2020-05-02 08:43:40 ---------------------------------- 【サクラチェッカー】 独自評価:1.89 サクラ度:90 価格・製品:95 ショップ情報・地域:95 ショップレビュー:0 レビュー分布:35 レビュー日付:35 レビュー&レビュアー:35 ----------------------------------
それぞれ並列でアクセスしています。
レビュー探偵の方が静的コンテンツであるため、先に処理完了となるはずです。
サクラチェッカーはJavaScriptによる動的コンテンツのため、若干待ちが発生します。
ちなみに、Amazonでの該当商品の評価は以下。
レビュー探偵とサクラチェッカーはともに信用できないと評価しています。
つまり、購入してはダメな商品となりますね。