「並列処理とは?」に関しては、この記事では説明しません。
説明する内容は、実務的にPythonで並列処理を行なうモノとなります。
また、それを無機質な例で示すのも面白くありません。
そこで、過去に行ってきたスクレイピングを並列で行います。
本記事の内容
- Pythonで並列処理を行なうため必要なライブラリ・モジュール
- concurrent.futuresによる並列処理
- Amazonの商品評価を一気に行う(並列処理)
- 【サンプルコード】Amazonの商品評価
それでは、上記に沿って解説をしていきます。
Pythonで並列処理を行なうため必要なライブラリ・モジュール
「Python 並列処理」で検索したら、以下の3つが候補に出てきます。
- threading
- concurrent.futures
- Joblib
今回は、concurrent.futuresを利用します。
理由は、以下の2つです。
- 追加でインストール不要
- マルチスレッド・マルチプロセスの両方に対応
追加でインストール不要
Python 3.5以上であれば、何もインストールする必要がないからです。
標準でインストールされているということですね。
なお、バージョンは以下のコードで確認可能。
import sys print(sys.version)
私の環境では、以下のように表示されました。
3.7.3 (default, Mar 27 2019, 17:13:21) [MSC v.1915 64 bit (AMD64)]
マルチスレッド・マルチプロセスの両方に対応
スレッドやプロセスに関しては、ここでは詳細な説明は行いません。
この記事では、あくまで実務的なことを優先します。
非同期実行は ThreadPoolExecutor を用いてスレッドで実行することも、 ProcessPoolExecutor を用いて別々のプロセスで実行することもできます. どちらも Executor 抽象クラスで定義された同じインターフェースを実装します。
上記は、concurrent.futuresのドキュメントからの引用です。
concurrent.futuresを用いれば、マルチスレッド・マルチプロセスを意識する必要がないと言えます。
対象となる並列処理によっては、どちらかでしか動かないということもあり得ます。
そのようなリスクに備えて、切り替え可能なconcurrent.futuresは価値があります。
concurrent.futuresによる並列処理
以下のパターンの並列処理を確認します。
基本的には、公式のドキュメントをベースにしています。
- マルチスレッド(ThreadPoolExecutor)
- マルチプロセス(ProcessPoolExecutor)
マルチスレッド(ThreadPoolExecutor)
import concurrent.futures
import urllib.request
URLS = ['https://employment.en-japan.com/',
'https://next.rikunabi.com/',
'https://tenshoku.mynavi.jp/',
'https://doda.jp/',
'https://www.ecareer.ne.jp/']
# 各ページを取得し、URLと内容を報告
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# 並列処理
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r 例外発生: %s' % (url, exc))
else:
print('%r ページの容量は %d bytes' % (url, len(data)))
これを2回実行すると以下のような結果になりました。
1回目
'https://doda.jp/' ページの容量は 112338 bytes 'https://employment.en-japan.com/' ページの容量は 72657 bytes 'https://next.rikunabi.com/' ページの容量は 12816 bytes 'https://tenshoku.mynavi.jp/' ページの容量は 191062 bytes 'https://www.ecareer.ne.jp/' ページの容量は 257070 bytes
2回目
'https://doda.jp/' ページの容量は 112338 bytes 'https://employment.en-japan.com/' ページの容量は 72703 bytes 'https://tenshoku.mynavi.jp/' ページの容量は 191062 bytes 'https://next.rikunabi.com/' ページの容量は 12816 bytes 'https://www.ecareer.ne.jp/' ページの容量は 256927 bytes
表示の順番が異なるのは、並列処理した結果の証拠と言えます。
また、逐次処理であれば、URLSに記述した順番になるはずです。
マルチプロセス(ProcessPoolExecutor)
公式のコードそのままです。
これは問題なく動きます。
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
実行した結果は以下。
112272535095293 is prime: True 112582705942171 is prime: True 112272535095293 is prime: True 115280095190773 is prime: True 115797848077099 is prime: True 1099726899285419 is prime: False
切り替えの検証
上記の公式のサンプルコードをもとに切り替えの検証をしてみます。
ThreadPoolExecutor ⇒ ProcessPoolExecutor
ダメですね。
'https://employment.en-japan.com/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending. 'https://next.rikunabi.com/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending. 'https://tenshoku.mynavi.jp/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending. 'https://doda.jp/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending. 'https://www.ecareer.ne.jp/' 例外発生: A process in the process pool was terminated abruptly while the future was running or pending.
マルチスレッドからマルチプロセスへの切り替えがアウトなのか?
それとも、load_url関数の処理がマルチプロセスに適さないのか?
とりあえず、ここではこの事象は追いかけません。
次の検証に進みます。
ProcessPoolExecutor ⇒ ThreadPoolExecutor
こちらの切り替えは、上手くいきました。
112272535095293 is prime: True 112582705942171 is prime: True 112272535095293 is prime: True 115280095190773 is prime: True 115797848077099 is prime: True 1099726899285419 is prime: False
マルチプロセスからマルチスレッドへの切り替えがOKなのか?
is_prime関数の処理がマルチスレッドにも対応できるからか?
まとめ
まあ、検証を結果を見ると疑問は残りますね。
と言っても、実際はどちらかで望む処理が動けばどちらでも問題ありません。
Amazonの商品評価を一気に行う(並列処理)
大体、並列処理のやり方はわかりました。
あとは、実際に動くかどうかですね。
今回の処理を行なう上での懸念は、Seleniumの処理です。
Seleniumに関しては、以下の記事を参考にしてください。
SeleniumはブラウザをPythonから動かす処理です。
それを並列で行えるのか?
これが最大の懸念ということです。
Seleniumによる並列処理の検証
まずは、この懸念を解消するための検証を行います。
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
CHROMEDRIVER = "chromedriver.exeのパス"
URLS = ['https://www.amazon.co.jp/dp/4297113511',
'https://www.amazon.co.jp/dp/4865940650',
'https://www.amazon.co.jp/dp/4844336479']
# ASIN取得
def get_asin_from_amazon(url):
asin = ""
# ヘッドレスモードでブラウザを起動
options = Options()
#options.add_argument('--headless')
# ブラウザーを起動
driver = webdriver.Chrome(CHROMEDRIVER, options=options)
driver.get(url)
driver.implicitly_wait(10) # 見つからないときは、10秒まで待つ
elem_base = driver.find_element_by_id('ASIN')
if elem_base:
asin = elem_base.get_attribute("value")
else:
print("NG")
# ブラウザ停止
driver.quit()
return asin
if __name__ == '__main__':
# 並列処理
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_to_url = {executor.submit(get_asin_from_amazon, url): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r 例外発生: %s' % (url, exc))
else:
print(data)
マルチスレッド(ThreadPoolExecutor)のサンプルを改良したモノです。
簡単に仕様を説明します。
URLSにはAmazonの商品ページのURLを3個記述しています。
AmazonのURLはアドレスバーからコピーすると恐ろしく長いモノになります。
この記事上では、便宜上短いURL(ASIN付き)となります。
なお、恐ろしく長いURL(ASINがわからない)を想定してプログラムは組んでいます。
だから、わざわざASINを取得する関数が存在しているのです。
get_asin_from_amazon関数で該当URLのページからASINを取得します。
その際、該当URLへはSeleniumを用いてアクセス・スクレイピングしています。
このコードは、実際に並列処理をしているのか?
これに対しては、実際にコードを動かせばわかると思います。
なんと、ブラウザが3個同時に立ち上がります!!
なかなか、面白いですよ。
ただし、あまり動かしすぎないように注意してください。
Amazonへのアクセスが同時に発生していますので。
ちなみに、このままではウザイです。
そのため、ブラウザを見えないようにするには以下のシャープ(#)を除去します。
#options.add_argument('--headless')
なお、Seleniumに関する処理部分でわからないコードがある場合は、以下の記事をご覧ください。
Amazonの商品評価を一気に行う仕様
- AmazonのページへアクセスしてASINを取得
- ASINをもとにサクラチェッカーの評価を取得 ⇒ 表示
- ASINをもとにレビュー探偵の評価を取得 ⇒ 表示
これらが主な機能となります。
そして、今回は2と3を並列処理します。
別に2⇒3と逐次処理でも問題はありません。
でも、なるべくなら早く処理は完了して欲しいです。
それに並列処理を学ぶには、うってつけの機会でもありますしね。
なお、Amazonにおける評価は取得していません。
そもそも、信用できないモノ扱いです。
比較対象としては取得してもいいのかもしれませんね。
でも、今回はナシでいきます。
チェックしたいAmazonの商品ページのURLが、入力する値です。
その入力に対する結果として、以下の2つのサイトにおける評価を得られます。
- サクラチェッカー
- レビュー探偵
それぞれのサイトに対しては、過去にスクレイピング済みです。
各サイトにスクレイピングに関する部分は、記事をご覧ください。
【サンプルコード】Amazonの商品評価
コピペで動くコードです。
今回は、過去のスクレイピングのプログラムをそのまま移植しています。
詳細は、過去記事をご覧ください。
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
URL = "https://www.amazon.co.jp/dp/B07HD71F8N"
SITES = [
"sakura",
"tantei"]
CHROMEDRIVER = "chromedriver.exeのパス"
# ASIN取得
def get_asin_from_amazon(url):
asin = ""
# ヘッドレスモードでブラウザを起動
options = Options()
options.add_argument("--headless")
# ブラウザーを起動
driver = webdriver.Chrome(CHROMEDRIVER, options=options)
driver.get(url)
driver.implicitly_wait(10) # 見つからないときは、10秒まで待つ
elem_base = driver.find_element_by_id('ASIN')
if elem_base:
asin = elem_base.get_attribute("value")
else:
print("NG")
# ブラウザ停止
driver.quit()
return asin
# サクラ度分析の値取得
def get_detail_value(elem):
value = elem.find_element_by_tag_name("div").text
value = value.replace("%", "", 1)
value = value.strip()
return value
# サクラチェッカーをスクレイピング
def get_sakurachecker(asin):
url = "https://sakura-checker.jp/search/" + asin + "/"
# ヘッドレスモードでブラウザを起動
options = Options()
options.add_argument('--headless')
# ブラウザーを起動
driver = webdriver.Chrome(CHROMEDRIVER, options=options)
driver.get(url)
driver.implicitly_wait(10) # 見つからないときは、10秒まで待つ
# 独自の評価
rating = ""
try:
elem_base = driver.find_element_by_class_name("mainBlock")
elem_rating = elem_base.find_element_by_class_name('item-rating')
rating = elem_rating.text
rating = rating.replace("/5", "", 1)
except:
traceback.print_exc()
# サクラ度
try:
elem_sakura_num = driver.find_element_by_class_name("sakura-num")
sakura_num = elem_sakura_num.text
sakura_num = sakura_num.replace("%", "", 1)
except:
traceback.print_exc()
# サクラ度の詳細分析
try:
elem_base = driver.find_element_by_class_name("mainBlock")
elem_circle = elem_base.find_elements_by_class_name('circlecustom')
price_product = get_detail_value(elem_circle[0])
shop_area = get_detail_value(elem_circle[1])
shop_review = get_detail_value(elem_circle[2])
review_distribution = get_detail_value(elem_circle[3])
review_date = get_detail_value(elem_circle[4])
review_reviewer = get_detail_value(elem_circle[5])
except:
traceback.print_exc()
# ブラウザ停止
driver.quit()
print("【サクラチェッカー】")
print("独自評価:" + rating)
print("サクラ度:" + sakura_num)
print("価格・製品:" + price_product)
print("ショップ情報・地域:" + shop_area)
print("ショップレビュー:" + shop_review)
print("レビュー分布:" + review_distribution)
print("レビュー日付:" + review_date)
print("レビュー&レビュアー:" + review_reviewer)
print("----------------------------------")
# レビュー探偵をスクレイピング
def get_reviewtantei(asin):
url = "https://review-tantei.com/detail/" + asin + "/"
try:
res = requests.get(url, headers=headers)
soup = BeautifulSoup(res.content, features='lxml')
# 独自の評価
div_new_review_rating = soup.find(class_="new_review_rating")
rating = div_new_review_rating.find("strong").text
rating = rating.replace("★", "", 1)
rating = rating.strip()
# レビューの信用度と判定結果
div_detail_el_trust = soup.find(class_="detail-el-trust")
trust_score = div_detail_el_trust.find(class_="trust_score").find("strong").text
trust_score = trust_score.replace("%", "", 1)
trust_score = trust_score.strip()
tmp_trust_grade = div_detail_el_trust.find(class_="trust_grade").find("strong").text
tmp_trust_grade = tmp_trust_grade.replace("%", "", 1)
tmp_trust_grade = tmp_trust_grade.strip()
tmp_trust_grade_list = tmp_trust_grade.split(' ')
trust_grade = tmp_trust_grade_list[0]
if ('A' in trust_grade):
trust_grade_2 = 6
elif ('B' in trust_grade):
trust_grade_2 = 5
elif ('C' in trust_grade):
trust_grade_2 = 4
elif ('D' in trust_grade):
trust_grade_2 = 3
elif ('E' in trust_grade):
trust_grade_2 = 2
elif ('F' in trust_grade):
trust_grade_2 = 1
elif ('S' in trust_grade):
trust_grade_2 = 7
else:
trust_grade_2 = 0
# レビュー評価詳細
ul_outer = soup.find(class_="list-group list-group-flush text-left")
li_list_group_item = ul_outer.find_all(class_="list-group-item")
check = []
for i in range(len(li_list_group_item)):
tmp = li_list_group_item[i].find_all(class_="fa")
if len(tmp) == 2:
# 「× ×」
check_point = 1
else:
target_str = str(tmp[0])
if ('text-danger' in target_str):
# 「×」
check_point = 2
elif ('text-warning' in target_str):
# 「!」
check_point = 3
elif ('text-primary' in target_str):
# 「○」
check_point = 4
else:
check_point = 0
check.append(check_point)
# 最終調査
analyzed_at = soup.find(class_="analyzed_at").text
analyzed_at = analyzed_at.replace("最終調査:", "", 1)
analyzed_at = analyzed_at.strip()
except:
print(res)
traceback.print_exc()
print("【レビュー探偵】")
print("独自評価:" + rating)
print("レビューの信用度:" + trust_score)
print("判定結果:" + str(trust_grade_2))
print("レビュー評価詳細:")
print(check)
print("最終調査:" + analyzed_at)
print("----------------------------------")
def get_review(site, asin):
if site == "sakura":
get_sakurachecker(asin)
else:
get_reviewtantei(asin)
if __name__ == "__main__":
asin = get_asin_from_amazon(URL)
# 並列処理
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future_to_site = {executor.submit(get_review, site, asin): site for site in SITES}
ポイントは、get_review関数を設けたところです。
この関数の中でget_sakuracheckerとget_reviewtanteiを呼び分けています。
その際の区別は、SITESの値で行う仕様となります。
別の商品を評価する場合は、以下の値を変更します。
URL = "https://www.amazon.co.jp/dp/B07HD71F8N"
このコードの実行結果は、以下となります。
【レビュー探偵】 独自評価:1.6 レビューの信用度:25 判定結果:2 レビュー評価詳細: [4, 3, 4, 4, 4, 1] 最終調査:2020-05-02 08:43:40 ---------------------------------- 【サクラチェッカー】 独自評価:1.89 サクラ度:90 価格・製品:95 ショップ情報・地域:95 ショップレビュー:0 レビュー分布:35 レビュー日付:35 レビュー&レビュアー:35 ----------------------------------
それぞれ並列でアクセスしています。
レビュー探偵の方が静的コンテンツであるため、先に処理完了となるはずです。
サクラチェッカーはJavaScriptによる動的コンテンツのため、若干待ちが発生します。
ちなみに、Amazonでの該当商品の評価は以下。

レビュー探偵とサクラチェッカーはともに信用できないと評価しています。
つまり、購入してはダメな商品となりますね。




