今回も引き続き Google Cloud Next '17 in Tokyo (2日目)で聴講した機械学習系(と BigQuery)のセッションのメモを書いていきます。
初日分はこちらです。
※ 聞き違いや理解不足から一部不正確な情報が載っている可能性がありますがご容赦ください。
Google 技術専門集団による「GCP 活用事例と今後の可能性」と「TensorFlow とエンジニアのパフォーマンスを最大限に引き出す Cloud ML Engine」
URL : Google Cloud Next | October 11-13, 2022
GCP 活用事例と今後の可能性
- GCP 採用メリット : 金額が安い
- ある事例では AWS から 30% 削減できた
- リザーブドインスタンスからの脱却(課金単位が時間から分単位へ)
- 事例紹介1:店舗内顧客行動のデータ化
- POS では売れなかったデータはわからない
- 機会損失を減らして売上をあげたい
- Web カメラで店内を解析して顧客行動を分析
- BigQuery で分析
- OpenCV も使って滞在時間や購買意欲を実映像の上に可視化
- ラズパイで高解像度動画を1時間単位でバッファリング
- Web カメラを GCP へ直接アップロード
- Windows PC の代替・省スペース化など
- システム構成
- GCE : 解析ソフト搭載インスタンス
- BigQuery : 解析後データ保持・分析
- GCS : システム導入前過去データ解析
- GAE : 管理 Web システム
- 売上10%アップ
- 効果的な POP 配置把握と欠品対応
- 来店数と売上の関係性の裏に売れなかった理由の仮説を見える化
- 今後の方向性
- 店舗別分析と立地などの属性をモデル化(店舗ごとの売上予測など)
- 時間別の人員配置と行動の定義化
- 棚割の見直しと欠品削減
- 事例紹介2:次世代冷蔵庫開発と食材消費動向把握
- コモディティ化する白物家電に価値の創造と新しいビジネスモデルの模索
- 前面上下2画面のディスプレイ付き冷蔵庫を提案
- Android
- バーコードなどから商品データを把握
- 連携企業間とのアプリ・コンテンツ共有
- システム構成
- GAE : 全端末管理・コンテンツ事業者とのサービス連携・管理 Web システム
- GCS : メモ・動画・静止画保存
- BigQuery : ログ保存・分析
- 効果
- コンセプトモデルとして新しい世界観を創出
- 食品消費モデルと POS 連携把握
- 同型他モデルの販売とパートナーエコシステムの開拓
- GCP を使うと実証実験が簡単にできる
- 新規システムに効果を発揮しやすい
- 技術者をリスペクトしてほしい、彼らが作りたいものをまず作ってもらいたい
- まずは何も考えずデータを溜めるようにしておいて、それ自体に価値をつけることは先送りできる
- 機械学習への移行
- お客様とエンジニアが楽しそうに共創していたのが印象的だった
- 採用するリスクよりも採用しないリスクを気にしたい
TensorFlow とエンジニアのパフォーマンスを最大限に引き出す Cloud ML Engine
- TensorFlow
- ディープラーニングなどのアルゴリズムを実装するための数値計算ライブラリ
- GPC を活用した高速計算や分散処理
- GPU クラスタを整えたりデプロイするようにしたり自分で動作環境を作るのは辛い
- Cloud ML Engine
- Tensorflow を動かす最適な環境を用意してくれる
- GPU を使った計算環境
- Google の高速なネットワークを使った分散処理
- Tensor Processing Unit (TPU)
- Tensorflow 専用のプロセッサ(2017/05 発表)
- NN の計算に特化 (15 ~ 30倍の性能、電力性能ひ 30 ~ 80倍)
- テスター募集中
- データエンジニアがやるべきことは Tensorflow のコードを Cloud ML Engine にサブミットすることだけ
- そのあとトレーニング
- 学習データや学習後のモデルの保存は GCS へ
- 出来上がったモデルを ML Engine にデプロイすると Web API を通して使える
- もちろんスケールする
- Tensorflow を動かす最適な環境を用意してくれる
Google Cloud Dataflow で実現するストリームおよびバッチデータ処理
URL : Google Cloud Next | October 11-13, 2022
- Cloud Dataflow とは
- ストリーム及びバッチの統合処理モデル
- OSS になっている (Apache Beam)
- オートスケール及び動的タスクシャーディング
- BigQuery / Bigtable / GCS など GCP のエコシステムと容易に連携
- データ処理の歴史
- MapReduce : シンプルスケーラビリティ(2003)
- FlumeJava : パイプライン・最適化・オーケストレーション(2010)
- MillWheel : ストリーミング・低遅延(2013)
- Cloud Dataflow : バッチ + ストリーム統合モデル・マネージド実行エンジン(2015)
- Dataflow の特徴
- マネージドサービス
- フルマネージドの自動構成
- 最適な実行パスになるようにグラフを最適化
- ビジネスロジックとチューニングのためのコードが混じりがち
- メンテナンス性が下がるし、プログラマはビジネスロジックに集中したい
- ジョブ実行中のオートスケール
- Dataflow のフェーズによって必要なリソースは変わってくる
- 上部実行中の動的ワークリバランス
- 遅いノードに引っ張られて全体が遅くなるので、時間のかかっているタスクを他のノードに分け与える
- プログラミングモデル + SDK
- The world beyond batch : Streaming 101
- 現代はスピードが増し、データがますます増えてきた
- データは本質的には次から次へ発生する
- システム処理の都合上、教会を定義するというのが従来の手法(バッチ処理)
- bounded data / unbounded data
- これらの課題を処理するための新しいモデル
- The world beyond batch : Streaming 101
- マネージドサービス
- データは無限に発生し、処理に遅延が発生することもある
- 8:00に発生したイベントが13:00に処理されるなど
- Element-wise(要素ごと)の変換
- ウインドウに分割しての集計
- イベントが発生した時間と集計タイミングが合わない(ウインドウから取りこぼす)ことがあり得る
- 遅れてきたデータをどう扱うか?という問題
- イベントタイムと処理タイムという概念
- イベントタイム: イベントが実際に発生した時間
- 処理タイム: イベントがシステムによって確認された時間
- Dataflow モデル4つの質問
- What : 何を計算するのか
- Element-wise
- Aggregating
- Composition
- Where : どこでデータを区切るのか
- Fixed window
- Sliding window
- Session window
- When : いつ処理をするのか
- Watermark
- イベントタイムがどこまで処理されたかを示すタイムポイント
- ヒューリスティックに設定する(ウォーターマークよりも古いデータは現れない)
- 早すぎるといくつかのデータを取り損ねる
- 遅すぎると結果の出力が遅れる
- withEarlyFirings と withLateFirings で調整ができる(on-time / early / late)
- Watermark
- How : どのようにデータを補正するのか
- 遅れてきたデータを足したり、無視したり、あるいは正規のデータとしたりできる
- What : 何を計算するのか
- クラシックバッチ / ウィンドウバッチ / ストリーミング / ストリーミング + 補正 の4パターンが表現可能
- 事例紹介 : 仮想通貨 c0banを用いた動画広告サービス
- 広告ダッシュボード (動画表示回数 / 完全視聴数 / ユーザ数 / 評価)
- Firebase -> BigQuery を最初考えたが直接ダッシュボードから参照するようにするとコスパが悪い
- フロントに App Engine : Pub/Sub へのアクセスを API 化するために置いた
- Cloud SQL : アプリ用のサマリデータを保存
- BigQuery : 社内向けの全件データを保持
- Dataflow モデル
- 何を : Impression 数:コンテンツとアクションでキーづけられた整数の合計
- どこで : 1時間のイベント時間の固定ウィンドウ内
- いつ (の時点で結果を実体化するか) : 1分ごとに更新し、2時間の遅延まで認める
- どのように(結果の更新を扱うか) : 遅れてきたイベントも加算する
- 全てのデータは Pub/Sub でやってきてそれを途中で2経路のフローに分岐させて処理
- 一つはダッシュボード用に集計値としてサマるフロー
- もう一つは BigQuery 用にそのまま突っ込むフロー
- 設計したパイプラインはチューニングの必要がないくらい優秀だった
- Pub/Sub に 8,000 リクエスト/秒(クオータ制限上限)を投げた
- 1インスタンスから4インスタンスに自動スケールアウト
- 3,200リクエスト/秒 -> 14,300リクエスト/秒にリニアにスケール
- 参考値としてバルスのツイート 14.3万ツイート
- 広告ダッシュボード (動画表示回数 / 完全視聴数 / ユーザ数 / 評価)
- Apache Beam 自体は Apache Spark を始め色々な実行環境で動作する
- Beam = Batch + Streaming
- Cloud DataFlow では Python SDK も動作する
BigQuery と Cloud Machine Learning : 大規模ニューラル ネットワークによる予測
URL : Google Cloud Next | October 11-13, 2022
- BigQuery
- 5,000 ~ 10,000 台のディスクで 1TB を1秒でスキャン
- Google Cloud : The Datacenter as a Computer
- データセンターを超並列計算機として扱える設計担っている
- Jupiter : Google が開発したネットワークファブリック
- 10G x 10万ポート = 1.2 Pbps
- マイクロ秒のレイテンシでサーバを集約
- 数百台のサーバとペタビットネットワークによる超並列処理クエリ
- Datacenter as a computer だからこそ実現できる技術
- データの民主化
- 昔はアプリケーションエンジニアですら(DB が死ぬことがあるので)気軽に DB にクエリを投げさせてもらえなかった
- BigQuery ではコスト上限さえかけておけば非効率なクエリもどんどん実行してOK
- Google Cloud Datalab
- Jupyter に GCP 周りのコマンドを統合しているので BQ のクエリもコード中に書いて呼び出せる
- Queryit Smart : シグネチャによるスマートな分析
- 特徴ベクトルによるドキュメントの類似検索
- stack overflow の投稿 1000万件に対して、正規表現マッチングを使った no index での比較が18秒で実行可能
- 何をやっているのか
- 単語に分割
- 前処理も UDF で行なっていて、これも BigQuery を使うと速い
- シグネチャ(特徴ベクトル)を抽出
- tf-idf で重要そうな単語を抽出、数秒でできる
- シグネチャ間の類似度を計算する UDF を定義
- コサイン類似度を計算
- SQL では書きにくいところを UDF(JS) で記述
- UDF を使って類似のドキュメントを検索
- 単語に分割
- ML Engine でスマートな分析
- 人の感覚や直感を SQL で表現するのは難しい
- HyperTune というパラメータチューニング支援もある
- BigQuery を使った画像検索の例
- Wikimedia の画像100万点をGCSにインポート
- ML Engine で CNN(VGG16) を適用
- ラベル分類をしてしまう直前(最終FC層)の特徴量をそのまま突っ込んで類似度をUDFで比較
- レンタルサイクルの需要予測
- UDF で多層パーセプトロンを定義し、GCS に保存したモデル(重みとバイアス)を読み込んでクエリで使うことまでできる
- GCS から読めるデータサイズは 5MB まで
- UDF にしてしまえば誰もが機械学習のモデルをクエリから利用することができる
- Queryit Smart はソースコードを GitHub から入手できる
BigQuery の先進機能 : クラウド データウェアハウスの未来を開く鍵
URL : Google Cloud Next | October 11-13, 2022
- Google BigQuery Analytics の著者
- クエリが遅い場合があることに気づいても(ブラックボックスなので)改善方法がわからない
- 実際のクエリ実行やクエリプランを通してアーキテクチャを実践的に解説
- Dremel のアーキテクチャ
- 発表当時は static tree だったが今は dynamic tree になっている
- Colossus というカラム指向の分散ストレージを使っている
- UI に表示されるクエリプランがチューニングにおいて有益
- SELECT COUNT() の例では partial count が shard ノードで先に行われてそのあと sum される
- 実行計画でも2ステージに分かれていることが確認できる
- Stage timing に wait / read /compute / write が表示されている
- 分散実行なのでノード間の平均実行時間や最大実行時間がどれだけ違うかを確認できる
- Input / Output rows が表示されていてどれだけのスロットを消費したかがわかる
- Scheduler がリソース割り当てを行う
- BigQuery におけるスロット
- スケジューリング対象となる BigQuery の並列実行可能な作業単位
- スループットが指標
- ジョブの最中に不要になったらキャンセル、また遅延やエラーの場合はリスタートされる
- デフォルトでは1プロジェクトについて 2,000 スロットが割り当てられる
- プロジェクト間・クエリ間でそれぞれフェアスケジューリングされる
- さらにクエリのステージごとにスケジューリングされ、特定のジョブに対するスロット数を知るのはデータとクエリの複雑度に依存するため難しい
- シャッフル
- カーディナリティの高いデータの集計においてシャッフルで近い値を同じノードに格納するようにすることで中間レイヤーの並列度を上げる
- 中間レイヤーの集計対象を区切る
- ORDER BY + LIMIT の場合は中間レイヤーの中で件数を絞れる(N件以上返す必要がない)ためスケーラブル
- クエリプラン上では
WRITE TO __SHUFFLE / BY HASH
のように何を key / value としてシャッフルが起きているか確認できる
- カーディナリティの高いデータの集計においてシャッフルで近い値を同じノードに格納するようにすることで中間レイヤーの並列度を上げる
- マルチシャッフル
- GROUP BY のキーが複数になると HASH キーが複数になる
- Stage 1 の compute が減る一方で wait する時間が長くなる
- Stage 2 の compute は当然増える
- シャッフルの tips
- ステージ N から N+1 へのマッピングが統計的に決定できない場合必ずシャッフルが行われる
- シャッフルは(Jupiter や Colossus などの恩恵を受けて)超高速に行われる
- シャッフルには quota が設定されており、そのサイズ?を超えるとディスクに書き出される
- quota は追加プランで上限を上げることができる
- シャッフルが実現する大規模 JOIN
- JOIN の際も JOIN のキーでまとめ上げるためにシャッフルが起きる
- JOIN 対象のテーブルそれぞれ個別にシャッフルがまず起きる
- クエリプランには
shuffle
は見当たらない?
- クエリプランには
- Output を見ると最後の JOIN ステップ時点ですでに多数のスロットが(不要になって)キャンセルされているのがわかる
- ブロードキャストによる小規模 JOIN
- JOIN 対象のテーブルどちらか一方が十分に小さい場合、シャッフルすることなく各 shard のメモリに全件コピーして結合ができる
- WHERE 句による絞り込みでテーブルを小さくする
- シャッフルが起きないので速い
- 再パーティショニング
- 100億レコードもの GROUP BY / ORDER BY になるとシャッフル後に再度必要な shard の割り当てが起きる
- 偏ったデータの JOIN
- 特定の shard に大量のデータが行ってしまい、平均実行時間と最大実行時間の乖離が大きくなる
- 両方のテーブルの再シャッフルが必要なので再ディスパッチが適用できない
- 少しトリッキーだがクエリを分割して、一つでヘビーキー(偏りの激しいところ)をやってもう一方でその他を処理する方法がある
- 大規模な ORDER BY
- ただの ORDER BY は最終的にマスタノードが全ての値を保持して並び替える必要がある
- LIMIT 句を足すことで、下位の shard による処理の時点で LIMIT 数以上のデータは drop できる
- Stage 1 の output が激減した
- Resource exceeded 問題の解決策
- 大きすぎるデータを生成してしまう場合 -> クエリを修正する
- ARRAY_AGG / STRING_AGG
- PARTITION なしの分析関数
- CROSS JOIN(容易に爆発する)
- 巨大なデータの並び替え -> LIMIT を追加する
- データが偏っていて特定の shard が過負荷になる場合 -> クエリを分割する
- 大きすぎるデータを生成してしまう場合 -> クエリを修正する
- 様々なカウント
- GROUP BY + COUNT(*)
- 遅いし複雑なクエリに組み込むのは難しい
- 正確な値を返す
- COUNT(DISTINCT x)
- 遅いがスケーラブル
- 標準SQLでは正確な値を返すようになった(それまでは概算値だったので問い合わせが多かった)
- APPROX_COUNT_DISTINCT(x)
- とても速い
- 概算値だがエラーレートは 0.3% ~ 1% 程度
- HyperLogLog の Google による拡張である HyperLogLog++ で実装されている
- GROUP BY + COUNT(*)
- 複雑なクエリの最適化
- WHERE 句で絞り込むキーの対象テーブルを入れ替えるだけでクエリが高速化するデモを実演
- JOIN ON a.xxx = b.xxx で、a.xxx を正規表現マッチングで絞り込んでいたものを b.xxx に対象を変更
- WHERE 句で絞り込むキーの対象テーブルを入れ替えるだけでクエリが高速化するデモを実演
フルサイクルのデータ分析を実現!〜 GCP のサーバーレス・データ分析基盤を活用したデータサイエンスの実践
URL : Google Cloud Next | October 11-13, 2022
- Cloud Pub/Sub をデータバッファとして利用
- 下流への流量調整ができる
- 移動平均などをリアルタイムで Dataflow で計算する
- BigQuery に適宜データがストリーミングインサートされるのでバッチ処理を待たなくてもクエリを投げられる
- PubSub に結果を再度書き出して他のシステムへの連携にも使える
- Firebase の realtime database
- 一昔前は Google もルールベースの検索をやっていた
- Giants の検索結果を場所で変えるなど
- 今は RankBrain による機械学習ベースに移行された
- 空調 On/Off した時のコストを予測して、トータルコストを下げるのがゴール
- Dataflow でルールベースの処理はできる、が閾値をどうすれば良いかわからない -> 機械学習にかけて算出
- Dataflow 内の閾値をチェックしていたコードを InferenceParDo を呼ぶように返るだけで CluodML の REST API を叩くことができ、予測結果を取得することができる