読者です 読者をやめる 読者になる 読者になる

RCO Study Night #3 Spark・Scala勉強会

CET - Caputer Everything 全サービス横断リアルタイムデータ収集・分析基盤

  • 吉田健二
    • リクルートコミュニケーションズ
    • リクルートライフスタイル出向
    • '15 7月中途入社
    • 8年Webアプリ、この6月からこの基盤開発に従事
  • CETプロジェクト
    • "Capture EveryThing"
    • リクルータライフスタイルの全サービス横断でリアルタイムにデータ(システムログ、ユーザ行動、在校変動)の収集・分析
    • リアルタイムデータ分析に必要な処理を一気通貫
    • エンジニア以外にビジネス系のディレクター、データサイエンティストも
  • CETが解決する問題
    • サービス・ビジネスに関するあらゆる情報の変化(ユーザ行動、在庫量の変動など)を我々サービス提供者がリアルタイムに把握できていない
    • 状況に応じて最適な施策を打てるように
  • サービス
  • システム構成
    • 各サービスからSETプロジェクトへtd-agent
      • -> BigQuery
      • -> Es,Kibana
      • -> S3
      • -> ELB -> td-agent -> Cloud Pub/Sub -> Cloud Dataproc Spark
        • -> Bigtable -> API->ELB -> td-agent ->集計結果をサービスへ還元
    • 1日あたり一億数千万件
  • 可視化事例
    • Airレジ
      • コールセンタでリアルタイムにログをモニタリング
      • アプリケーションのスローダウンやユーザ操作の戸惑いなど、ユーザビリティに関する情報を迅速に検知し、顧客サポート品質向上に努める
    • じゃらん
      • Spark Streamingでウィンドウ集計
      • 定期的に直近のユーザ行動ログを集計し、宿ページごとのUU数をリアルタイムに算出
    • サービス共通のパフォーマンス監視
      • ログを定期的にウィンドウ集計、特定以上の処理時間のあるURIをアラート
  • GCPの新しいサービス(Pub/Sub, CloudProc, BigTable)
    • 何故?
      • 昔からAWSを使う慣習があったが、GCPの方が安いので
      • スループットが要求されだしたあたりでDynamoDBからBigTableに置き換え
        • 1/10コストで同じ処理速度が実現できる
    • BigTable選定にあわせて更新系をGCPに置き換え
      • EMR -> DataProc, Kinesis -> Pub/Sub
    • Cloud Pub/Sub
      • Apache Kafka, Amaon Kinesisみたいなやつ
      • 2ヶ月運用していて、概ね不満はないがいくつか制限がある
        • 1回のPublish上限が1000件まで、緩和はできないっぽい
          • tdを並列に走らせたり、td側がで1000件ずつ分割させたりしている(自前でプラグイン書いた)
        • Subscribe上限も1度に1000件まで
          • ドキュメントに設定があるんだけど、1000件で頭打ちしちゃった
          • 1000/回はやっぱりちょっと遅い、Sparkのレシーバクラス側をマルチスレッドでpullのリクエストを投げるように実装中です
      • Spark Streaming + Cloud Pub/Subのサンプルが実用的でなかった
        • Spark Streamingのレシーバクラスが必要なのだが独自実装で書かれてなかった
        • Recieverクラスを自前で実装しました
    • Cloud BigTable
      • DynamoDBにくらべて圧倒的なコスパ
      • 機能面の不満
        • TTL機能がちょっと微妙
          • 最短寿命の指定はできるが、超えると必ずクリアされる訳ではない
          • APIコール側で吸収しました
        • 結構落ちる
          • 内部のバージョンアップで落ちました、と半日〜1日落ちてから連絡が来ました
          • まだまだベータ版、収益サービスに載せるには不安
            • ベータなのはDataProcも同じ
    • DataProc
      • DataProc1.0 -> Spark1.5.0
        • DataProc1.2(Spark1.5.2)でBigTableに繋げられなかった、クラスパスが通ってなかった
        • 2,3日後に修正されました
      • クラスタ構築速度はやっぱりすごい
        • 1分くらい
      • 安定しない、収益サービスに載せるのは怖い
  • 今後検討していること

    • 在庫変動データに基づいた、在庫売り切れ予測
    • リアルタイム異常検知
  • Q&A

    • リアルタイムUU算出ってどれくらいのパフォーマンス?BigTableは足引っ張ってる?
      • near realtimeで出せてます
    • BigTable側で変更を通知するAPIがある?
      • 通知はしていない
      • サービス側から定期的にポーリングをかけて、都度BigTableから持ってきている
    • ハイブリッドクラウドの運用上の負荷は出た?
      • やっぱり統一されて無くて面倒くさい
      • できたらGCPに寄せたいなあとは思うが、社内はAWSよりなので遠そう
    • Pub/SubとKinesisのコスト上の差異は
      • 運用が違うが、現状Pub/Subの方が高くなってる
      • Kinesisはストリームを作らないといけないが、Pub/Subはいらない
      • 同じ量だとどうなんだろう、同じくらいかも...
    • Cloud Dataflowにしなかった理由はありますか?
      • ウィンドウ集計の要求があって、技術検証を兼ねてSparkを名指しで採用、Dataflowはあまり調べてなかったがもしかしたらソッチのほうがいいのかも
    • ウィンドウ集計はどこかに永続化している?

DSP開発におけるSpark MLlibの活用

  • 棚橋耕太郎
  • DIMSUM; 類似商品の高速な近似計算
    • DSPにどう活用されている?
    • 10万×10万でやるとやはりデカいので、分散処理したい
    • ナイーブにMapReduce内積を求めると
      • MapReduceで気をつけるのはシャッフルサイズ
        • ネットワークの計算複雑性
      • m:行数、L:行あたりの非ゼロ要素の個数
      • シャッフルサイズがO(mL2)に、行数mの増大で計算不可能にまで膨れる
    • DIMSUM
      • ナイーブではないMapReduce内積をを求める
      • エミットするかどうかをある確率(オーバーサンプリングパラメータ: γ)で決める、ノルムが大きいベクトル要素はサンプルせずにスキップ
        • シャッフルサイズはnlognに、mに依存しなくなる
        • n:ユーザ数
    • DIMSUMの照明
      • Chernoff boundで証明しています
      • Twitterでやってみた結果が論文にも乗っている、40%の計算量負荷軽減に
      • ちょうどいいのはエラー10%以下、のγ=2logn/ε
    • MLlibにおけるDIMSUM
      • val mat: RowMatrix = new RowMatrix(rows)
      • val sim= mat.columnsSimilarities(1000)
  • word2vec; 単語の特徴ベクトル作成
    • GoogleのMikolovさん
      • 単語をベクトル化して便利になるツール
    • アイテム特徴量の作成に使っている
      • アイテムのwebページに含まれる単語の特徴ベクトルを示せる
      • あるユーザについてすべて平均化するとユーザの特徴量が出せる
      • CVとの関係を回帰学習させられる
    • これをWebページについて行う試み
      • ページ数3000万、ユーザ数7~8000万、 PV1.6億/日
      • ->Spark使ってみました
    • MLlibのword2vecはGoogleのC実装からの移植、制限が強い
      • Skip gramの階層的なソフトマックスのみ
      • ネガティブサンプリングを使いたかったので実装
        • 頻出単語に強いです
        • じつは単純で、似非ディープラーニング
    • MLlibのword2vec cont'd
      • ちゃんと動きません!
        • 単語ベクトルが発散してしまう
        • データシャードに分けて学習してReduceするときに、全部のパラメータを足しちゃう
        • 単純平均にして、すべてのパラメータをパーティション数で割って計算させて劇的に改善しました
      • 単純平均だと学習速度が低い
        • ⊿θをデータ数で割っている、データの凸性が低いと並列化の高速化が望めなくなる
        • 学習中にパーティション間でパラメータの同期ができないか?
          • Googleオリジナル版は共有メモリで非同期に更新している
          • 同じことをMLlibでやるために↓
      • parameter server型モデル
        • '13の論文
        • 1箇所(parameter server)にパラメータを置く
          • ○パラメータ次元が大きくても分散保持できる、収束速度向上
            • アドテク業界だとロジスティック回帰で10Bになったりするので、結構なアドバンテージです
          • ✕sparkは非対応、通信コスト大
        • Sparkでparameter serverを使えるDist-MLをintelが出してるが、イマイチ早くならなかったです
  • Splash; 分散word2vec学習の高速化
    • パラメータをノードでこべつに学習、重み付けすることで高い学習効率
      • UCバークレーamplabのOSSライブラリ
      • 既存のMLlibの簡単に組み合わせて使える
      • ノード間通信は1回/iteration
      • LDA,協調フィルタリングSGD,Logistic回帰など確率系の実装が既にあります
      • この夏くらいに出たライブラリです
    • Splashの戦略
      • 更新⊿をm倍したSiで擬似的に全体データの近似として与える
      • すべての更新幅をθoldに足す
      • 誤差は(4G22 Tmn)
      • 論文に威力が証明されている
        • 2次元の最適化問題
          • 単純加算は誤差20(発散)
          • 単純平均で-0.6、Splashは-0.1未満の範囲に収まる
      • RDDをちょっと変えた、Parameterized RDDに読み込ませる
      • shared variableに更新したい値をaddで単純に足していくだけで更新していける
    • Splashの収束性能
      • SGDやL-BFGSに比べて非常に速い、100倍位早くなってるように見える
  • まとめ
    • DIMSUMなどの分散処理向けアルゴリズムをうまく使いこなしたい
    • MLlibは便利だが、注意深く使わないと性能が発揮できない
    • MLlib以外の選択肢も考慮にいれるといいかも
      • 完成されたもの、というわけではない
  • Q&A
    • DIMSUM、以前やったら商品数/ユーザ数の増加に応じて加速度的にメモリ消費量が増えた、具体的な数字はありますか
      • ノード数を増やすほど速くはなるが、EMRのm3.large上でエクゼキュータは4台、Sparkメモリサイズはあんまり大きくしていない
      • ソートするときのほうが時間がかかっていた ←Driverのメモリに依存
      • アイテム数などに応じた学習までの計算時間は、系統的なのは出せないが30分とか
    • 協調フィルタリング以外の類似度を求めるアルゴリズムとくらべて結果でどれくらい差がでましたか
      • cos類似度以外にはやってない、アイテムの類似度を使ってコンテンツを...とかはしてないです
      • 汎用的に作るためにアイテムのコンテンツには着目していない、R社はめちゃくちゃなバリエーションなので...
    • データが1回以上訪れられた時に、訪れられていたかどうかのカウント
      • これは結構スパース、あるていど行列が抑えられていた状態で計算しているというイメージです
    • 広告アイテムの推薦、のゴールは期待した値が出たらOK?何かしらのしきい値がある?
      • 今回は実際に総当り計算した時との差です
      • 最終的なゴールはcos類似度よりは実際のtop100とかとの差異を縮めていく方向

R使いがSparkを使ったら

  • 早川敦士
    • リクルートコミュニケーションズ新卒1年目
    • アドホック分析や可視化ツール開発
    • 大学時代は品質管理・信頼性工学など
    • Japan.R 2015主催
      • 普段はR言語のコミュニティにいます
  • R言語のからみるSpark
  • Strata Hadoop
    • カンファレンスのメインテーマはSpark
    • 行った後のぼく
      • 時代はSparkだ!
      • R言語ではさばけない大規模データをSparkで処理したい
      • 馴染み深いDataFrameも使える!
  • R言語ユーザにとってお馴染みのdata.frame
    • R言語ユーザはdata.frameで生活している
    • google trendでも「data.frame」うなぎのぼり
    • dplyrどっぷり
      • iris %>% head(2)
        • 世の中には100行くらい書いてる人もいるらしい
        • こういうので普段から集計しています
          • これをSparkでやるには...?
  • SparkにおけるDataFrame
    • sql-programming-guide.html / docs/latestにだいたいのことは書いてある
      • 困ったらScalaDocへ潜る
    • 現在はSparkCore/DataFrameAPIからRDD->org.apache.spark.mllib
    • 将来はDataFrameAPIからorg.apache.spark.mlに書き換わるのかも?
  • Spark DataFrameとR言語の比較
    • データの読み込み
      • Rなら1コマンド、Sparkは自前パースか別のライブラリ
    • 最初の?行
      • df %>% head(1)
      • df.limit(1)
        • dataframeのhead()だと結果が帰ってこないのでlimit()じゃないと困る
    • 列の選択はどちらもselect
    • 条件に一致する行
      • df %>% filter(Sepal_Length > 5)
      • df.filter($"Sepal_Length".gt(5)
    • groupByで行数カウント
      • Sparkはagg()でまとめてもOK
    • ソート
      • Rはarrange、Sparkはsort
    • ランク
      • Rはrank、SparkはHiveContextと書きながらSQLContextを使わないと動かないようです
        • alias()as()はソースを見ると中身はほぼ一緒でした
    • partitionBy
      • df()$""の方がイケてる、と先輩から突っ込まれました
      • rowNumber
        • pgSQLはよく使うかも
        • Rだとnに何行目か入っている
        • SparkはrowNumberを呼ばないといけない
    • 差分を求める
      • lag()
    • 累積和
      • Rはcumsum()
      • Sparkはsum()でイケるみたいです
    • inner join
      • あらかじめデータ側でやってることが多そうですが
      • R: inner_join()
      • Spark: join().where($"..." === $"..." && $",,," === $",,,")
  • 今後のSparkはRDDではなく、DataFrameがメインになっていく可能性が高いので、明日から使おう!!
  • Q&A
    • DataFrameにおいてRにあってSparkにない機能はありました?
      • ドキュメントが揃ってなくて使いづらい以外は大丈夫かなと
    • SparkRは今回は調べてないです