分析プロセスを並列実行する

はじめに

ここでは Jupyter Notebookを用いて対話的に、分析プロセス(分析の処理手順)の設計と分析モデル作成を行います。 分析プロセスの設計と分析モデル作成には、SAMPO API(異種混合学習技術を用いた分析ソフトウェア SAMPO/FAB の Python API)を使用します。 分析モデルを作成する分析プロセスを並列に実行する例で説明します。

データを準備する

  1. ユーザがアクセス可能なディレクトリにsampo-getting_started.zipを格納します。

  2. 格納したファイルを解凍します。

    $ unzip sampo-getting_started.zip -d ~/work
    

Jupyter Notebook上で分析する

Jupyter Notebook上で、 [work/examples] ディレクトリに移動してください。

notebookを作成するため、Jupyter Notebook上で [New] ボタンを押下し、 [Python 3] を選択します。

1. 分析プロセス設計

分析プロセス設計として、SPD (プロセス記述: 分析プロセスの設計情報) とSRC (プロセス実行設定: 分析プロセスの実行指示情報) を作成します。

In [1]:
import logging
import pandas as pd
from sampo.api import sampo_logging, gen_spd, gen_src
from sampotools.api import gen_asd_from_pandas_df

sampo_logging.configure(logging.INFO, filename='./getting_started-session_run.log')

learn_df = pd.read_csv('./data/fabhmerg_learn.csv', na_values='?')
predict_df = pd.read_csv('./data/fabhmerg_predict.csv', na_values='?')
asd = gen_asd_from_pandas_df(learn_df)

# プロセス記述(SPD)の定義
spd_content = '''
dl -> std  -> rg
   -> bexp -> rg

---

components:
    dl:
        component: DataLoader

    std:
        component: StandardizeFDComponent
        features: scale == 'real' or scale == 'integer'

    bexp:
        component: BinaryExpandFDComponent
        features: scale == 'nominal'

    rg:
        component: FABHMEBernGateLinearRgComponent
        features: name != 'price'
        target: name == 'price'
        standardize_target: True
        tree_depth: 3

global_settings:
    keep_attributes:
        - price
    feature_exclude:
        - price
'''
spd = gen_spd(template=spd_content)

# 学習用のプロセス実行設定(SRC)のテンプレートを定義
learn_src_templ = '''
learn_rand{{ random_restart }}:
    type: learn
    data_sources:
        dl:
            df: {{ df }}
            attr_schema: {{ attr_schema }}
'''

# 予測用のプロセス実行設定(SRC)のテンプレートを定義
predict_src_templ = '''
predict_rand{{ random_restart }}:
    type: predict
    data_sources:
        dl:
            df: {{ df }}
            attr_schema: {{ attr_schema }}
    model_process: learn_rand{{ random_restart }}
'''

process_list = []  # 並列実行するプロセスを保存するリスト
# 学習はランダムな初期状態に依存するため複数回準備する
random_restart_times = 3
for i in range(random_restart_times):
    learn_src_param = {'random_restart': i, 'df': learn_df, 'attr_schema': asd}
    learn_src = gen_src(template=learn_src_templ, params=learn_src_param)
    predict_src_param = {'random_restart': i, 'df': predict_df, 'attr_schema': asd}
    predict_src = gen_src(template=predict_src_templ, params=predict_src_param)
    process_list.append((learn_src, spd))  # 学習用の分析プロセスの追加
    process_list.append((predict_src, None))  # 予測用の分析プロセスの追加

2. 分析モデル作成

複数の分析プロセス実行を並列に実行します。

In [2]:
from sampo.api import process_runner, process_store

pstore_url = './parallel_pstore'
process_store.create(pstore_url)

process_runner.session_run(process_list, pstore_url=pstore_url, max_workers=3)

process_store.list_process_metadata(pstore_url)
Out[2]:
process name version started at running time status
0 learn_rand0 3a8ab69f-1227-405c-8575-94ff831d9685 2019-01-31 19:31:01.737904 00:00:02.741186 Succeeded
1 learn_rand1 028679ea-ea1e-4001-ac3d-d70108f5f4e2 2019-01-31 19:31:01.724090 00:00:02.524713 Succeeded
2 learn_rand2 38f88cd9-5f2f-4d17-a134-0b5457ffe653 2019-01-31 19:31:01.758928 00:00:02.376406 Succeeded
3 predict_rand0 788d098d-b54b-4530-9a51-c5fc5dd0446b 2019-01-31 19:31:04.583651 00:00:01.006656 Succeeded
4 predict_rand1 f259ea93-821f-4bcd-a8a6-bf9524def556 2019-01-31 19:31:04.458157 00:00:01.133541 Succeeded
5 predict_rand2 dddf7b70-29fc-4ed9-bbee-d254395c89cb 2019-01-31 19:31:04.221410 00:00:01.371365 Succeeded

3. 結果評価

RMSEが最小な分析モデルを良い分析モデルとするため、各分析モデルのRMSEを一覧します。

In [3]:
import re
import pandas as pd
from sampo.api import process_store

result = []
predict_proc_names = [src.name for src, _ in process_list if re.match('predict*', src.name)]
for predict_proc_name in predict_proc_names:
    row = {}
    with process_store.open_process(pstore_url, predict_proc_name) as prl:
        evaluation = prl.load_comp_output_evaluation('rg')
        row['process_name'] = predict_proc_name
        row['rmse'] = evaluation['root_mean_squared_error'][0]
        result.append(row)

pd.DataFrame(result).sort_values(by='rmse')
Out[3]:
process_name rmse
0 predict_rand0 2895.264530
1 predict_rand1 3496.908605
2 predict_rand2 3986.986632