RCO Study Night #3 Spark・Scala勉強会
CET - Caputer Everything 全サービス横断リアルタイムデータ収集・分析基盤
- 吉田健二
- CETプロジェクト
- "Capture EveryThing"
- リクルータライフスタイルの全サービス横断でリアルタイムにデータ(システムログ、ユーザ行動、在校変動)の収集・分析
- リアルタイムデータ分析に必要な処理を一気通貫で
- エンジニア以外にビジネス系のディレクター、データサイエンティストも
- CETが解決する問題
- サービス・ビジネスに関するあらゆる情報の変化(ユーザ行動、在庫量の変動など)を我々サービス提供者がリアルタイムに把握できていない
- 状況に応じて最適な施策を打てるように
- サービス
- システム構成
- 可視化事例
- GCPの新しいサービス(Pub/Sub, CloudProc, BigTable)
- 何故?
- BigTable選定にあわせて更新系をGCPに置き換え
- EMR -> DataProc, Kinesis -> Pub/Sub
- Cloud Pub/Sub
- Apache Kafka, Amaon Kinesisみたいなやつ
- 2ヶ月運用していて、概ね不満はないがいくつか制限がある
- 1回のPublish上限が1000件まで、緩和はできないっぽい
- tdを並列に走らせたり、td側がで1000件ずつ分割させたりしている(自前でプラグイン書いた)
- Subscribe上限も1度に1000件まで
- ドキュメントに設定があるんだけど、1000件で頭打ちしちゃった
- 1000/回はやっぱりちょっと遅い、Sparkのレシーバクラス側をマルチスレッドでpullのリクエストを投げるように実装中です
- 1回のPublish上限が1000件まで、緩和はできないっぽい
- Spark Streaming + Cloud Pub/Subのサンプルが実用的でなかった
- Spark Streamingのレシーバクラスが必要なのだが独自実装で書かれてなかった
- Recieverクラスを自前で実装しました
- Cloud BigTable
- DataProc
今後検討していること
- 在庫変動データに基づいた、在庫売り切れ予測
- リアルタイム異常検知
Q&A
- リアルタイムUU算出ってどれくらいのパフォーマンス?BigTableは足引っ張ってる?
- near realtimeで出せてます
- BigTable側で変更を通知するAPIがある?
- 通知はしていない
- サービス側から定期的にポーリングをかけて、都度BigTableから持ってきている
- ハイブリッドクラウドの運用上の負荷は出た?
- Pub/SubとKinesisのコスト上の差異は
- 運用が違うが、現状Pub/Subの方が高くなってる
- Kinesisはストリームを作らないといけないが、Pub/Subはいらない
- 同じ量だとどうなんだろう、同じくらいかも...
- Cloud Dataflowにしなかった理由はありますか?
- ウィンドウ集計の要求があって、技術検証を兼ねてSparkを名指しで採用、Dataflowはあまり調べてなかったがもしかしたらソッチのほうがいいのかも
- ウィンドウ集計はどこかに永続化している?
- BigTableのみです
- リアルタイムUU算出ってどれくらいのパフォーマンス?BigTableは足引っ張ってる?
DSP開発におけるSpark MLlibの活用
- 棚橋耕太郎
- DIMSUM; 類似商品の高速な近似計算
- DSPにどう活用されている?
- 10万×10万でやるとやはりデカいので、分散処理したい
- ナイーブにMapReduceで内積を求めると
- MapReduceで気をつけるのはシャッフルサイズ
- ネットワークの計算複雑性
- m:行数、L:行あたりの非ゼロ要素の個数
- シャッフルサイズがO(mL2)に、行数mの増大で計算不可能にまで膨れる
- MapReduceで気をつけるのはシャッフルサイズ
- DIMSUM
- DIMSUMの照明
- Chernoff boundで証明しています
- Twitterでやってみた結果が論文にも乗っている、40%の計算量負荷軽減に
- ちょうどいいのはエラー10%以下、のγ=2logn/ε
- MLlibにおけるDIMSUM
val mat: RowMatrix = new RowMatrix(rows)
val sim= mat.columnsSimilarities(1000)
- word2vec; 単語の特徴ベクトル作成
- GoogleのMikolovさん
- 単語をベクトル化して便利になるツール
- アイテム特徴量の作成に使っている
- アイテムのwebページに含まれる単語の特徴ベクトルを示せる
- あるユーザについてすべて平均化するとユーザの特徴量が出せる
- CVとの関係を回帰学習させられる
- これをWebページについて行う試み
- ページ数3000万、ユーザ数7~8000万、 PV1.6億/日
- ->Spark使ってみました
- MLlibのword2vecはGoogleのC実装からの移植、制限が強い
- Skip gramの階層的なソフトマックスのみ
- ネガティブサンプリングを使いたかったので実装
- 頻出単語に強いです
- じつは単純で、似非ディープラーニング
- MLlibのword2vec cont'd
- ちゃんと動きません!
- 単語ベクトルが発散してしまう
- データシャードに分けて学習してReduceするときに、全部のパラメータを足しちゃう
- 単純平均にして、すべてのパラメータをパーティション数で割って計算させて劇的に改善しました
- 単純平均だと学習速度が低い
- parameter server型モデル
- '13の論文
- 1箇所(parameter server)にパラメータを置く
- ○パラメータ次元が大きくても分散保持できる、収束速度向上
- アドテク業界だとロジスティック回帰で10Bになったりするので、結構なアドバンテージです
- ✕sparkは非対応、通信コスト大
- ○パラメータ次元が大きくても分散保持できる、収束速度向上
- Sparkでparameter serverを使えるDist-MLをintelが出してるが、イマイチ早くならなかったです
- ちゃんと動きません!
- GoogleのMikolovさん
- Splash; 分散word2vec学習の高速化
- パラメータをノードでこべつに学習、重み付けすることで高い学習効率
- Splashの戦略
- Splashの収束性能
- SGDやL-BFGSに比べて非常に速い、100倍位早くなってるように見える
- まとめ
- DIMSUMなどの分散処理向けアルゴリズムをうまく使いこなしたい
- MLlibは便利だが、注意深く使わないと性能が発揮できない
- MLlib以外の選択肢も考慮にいれるといいかも
- 完成されたもの、というわけではない
- Q&A
- DIMSUM、以前やったら商品数/ユーザ数の増加に応じて加速度的にメモリ消費量が増えた、具体的な数字はありますか
- ノード数を増やすほど速くはなるが、EMRのm3.large上でエクゼキュータは4台、Sparkメモリサイズはあんまり大きくしていない
- ソートするときのほうが時間がかかっていた ←Driverのメモリに依存
- アイテム数などに応じた学習までの計算時間は、系統的なのは出せないが30分とか
- 協調フィルタリング以外の類似度を求めるアルゴリズムとくらべて結果でどれくらい差がでましたか
- cos類似度以外にはやってない、アイテムの類似度を使ってコンテンツを...とかはしてないです
- 汎用的に作るためにアイテムのコンテンツには着目していない、R社はめちゃくちゃなバリエーションなので...
- データが1回以上訪れられた時に、訪れられていたかどうかのカウント
- これは結構スパース、あるていど行列が抑えられていた状態で計算しているというイメージです
- 広告アイテムの推薦、のゴールは期待した値が出たらOK?何かしらのしきい値がある?
- 今回は実際に総当り計算した時との差です
- 最終的なゴールはcos類似度よりは実際のtop100とかとの差異を縮めていく方向
- DIMSUM、以前やったら商品数/ユーザ数の増加に応じて加速度的にメモリ消費量が増えた、具体的な数字はありますか
R使いがSparkを使ったら
- 早川敦士
- R言語のからみるSpark
- Strata Hadoop
- カンファレンスのメインテーマはSpark
- 行った後のぼく
- 時代はSparkだ!
- R言語ではさばけない大規模データをSparkで処理したい
- 馴染み深いDataFrameも使える!
- R言語ユーザにとってお馴染みのdata.frame
- SparkにおけるDataFrame
- Spark DataFrameとR言語の比較
- データの読み込み
- Rなら1コマンド、Sparkは自前パースか別のライブラリ
- 最初の?行
df %>% head(1)
df.limit(1)
- dataframeのhead()だと結果が帰ってこないのでlimit()じゃないと困る
- 列の選択はどちらもselect
- 条件に一致する行
df %>% filter(Sepal_Length > 5)
df.filter($"Sepal_Length".gt(5)
- groupByで行数カウント
- Sparkは
agg()
でまとめてもOK
- Sparkは
- ソート
- Rは
arrange
、Sparkはsort
- Rは
- ランク
- Rは
rank
、SparkはHiveContext
と書きながらSQLContext
を使わないと動かないようですalias()
とas()
はソースを見ると中身はほぼ一緒でした
- Rは
- partitionBy
df()
は$""
の方がイケてる、と先輩から突っ込まれました- rowNumber
- pgSQLはよく使うかも
- Rだと
n
に何行目か入っている - Sparkは
rowNumber
を呼ばないといけない
- 差分を求める
lag()
- 累積和
- Rは
cumsum()
- Sparkは
sum()
でイケるみたいです
- Rは
- inner join
- あらかじめデータ側でやってることが多そうですが
- R:
inner_join()
- Spark:
join().where($"..." === $"..." && $",,," === $",,,")
- データの読み込み
- 今後のSparkはRDDではなく、DataFrameがメインになっていく可能性が高いので、明日から使おう!!
- Q&A
- DataFrameにおいてRにあってSparkにない機能はありました?
- ドキュメントが揃ってなくて使いづらい以外は大丈夫かなと
- SparkRは今回は調べてないです
- DataFrameにおいてRにあってSparkにない機能はありました?