べにやまぶろぐ

技術寄りの話を書くつもり

Google Cloud Next '17 in Tokyo の機械学習系セッション聴講メモ(Day 2) #googlenext17

f:id:beniyama:20170615172340j:plain

今回も引き続き Google Cloud Next ‘17 in Tokyo (2日目)で聴講した機械学習系(と BigQuery)のセッションのメモを書いていきます。

初日分はこちらです。

beniyama.hatenablog.jp

※ 聞き違いや理解不足から一部不正確な情報が載っている可能性がありますがご容赦ください。


Google 技術専門集団による「GCP 活用事例と今後の可能性」と「TensorFlow とエンジニアのパフォーマンスを最大限に引き出す Cloud ML Engine」

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

GCP 活用事例と今後の可能性

  • GCP 採用メリット : 金額が安い
    • ある事例では AWS から 30% 削減できた
    • リザーブドインスタンスからの脱却(課金単位が時間から分単位へ)
  • 事例紹介1:店舗内顧客行動のデータ化
    • POS では売れなかったデータはわからない
    • 機会損失を減らして売上をあげたい
    • Web カメラで店内を解析して顧客行動を分析
      • BigQuery で分析
      • OpenCV も使って滞在時間や購買意欲を実映像の上に可視化
      • ラズパイで高解像度動画を1時間単位でバッファリング
        • Web カメラを GCP へ直接アップロード
        • Windows PC の代替・省スペース化など
    • システム構成
      • GCE : 解析ソフト搭載インスタンス
      • BigQuery : 解析後データ保持・分析
      • GCS : システム導入前過去データ解析
      • GAE : 管理 Web システム
    • 売上10%アップ
    • 効果的な POP 配置把握と欠品対応
    • 来店数と売上の関係性の裏に売れなかった理由の仮説を見える化
    • 今後の方向性
      • 店舗別分析と立地などの属性をモデル化(店舗ごとの売上予測など)
      • 時間別の人員配置と行動の定義化
      • 棚割の見直しと欠品削減
  • 事例紹介2:次世代冷蔵庫開発と食材消費動向把握
    • コモディティ化する白物家電に価値の創造と新しいビジネスモデルの模索
    • 前面上下2画面のディスプレイ付き冷蔵庫を提案
    • Android
      • バーコードなどから商品データを把握
      • 連携企業間とのアプリ・コンテンツ共有
    • システム構成
      • GAE : 全端末管理・コンテンツ事業者とのサービス連携・管理 Web システム
      • GCS : メモ・動画・静止画保存
      • BigQuery : ログ保存・分析
    • 効果
      • コンセプトモデルとして新しい世界観を創出
      • 食品消費モデルと POS 連携把握
      • 同型他モデルの販売とパートナーエコシステムの開拓
  • GCP を使うと実証実験が簡単にできる
    • 新規システムに効果を発揮しやすい
    • 技術者をリスペクトしてほしい、彼らが作りたいものをまず作ってもらいたい
    • まずは何も考えずデータを溜めるようにしておいて、それ自体に価値をつけることは先送りできる
      • 機械学習への移行
    • お客様とエンジニアが楽しそうに共創していたのが印象的だった
      • 採用するリスクよりも採用しないリスクを気にしたい

TensorFlow とエンジニアのパフォーマンスを最大限に引き出す Cloud ML Engine

  • TensorFlow
    • ディープラーニングなどのアルゴリズムを実装するための数値計算ライブラリ
    • GPC を活用した高速計算や分散処理
    • GPU クラスタを整えたりデプロイするようにしたり自分で動作環境を作るのは辛い
  • Cloud ML Engine
    • Tensorflow を動かす最適な環境を用意してくれる
      • GPU を使った計算環境
      • Google の高速なネットワークを使った分散処理
      • Tensor Processing Unit (TPU)
        • Tensorflow 専用のプロセッサ(2017/05 発表)
        • NN の計算に特化 (15 ~ 30倍の性能、電力性能ひ 30 ~ 80倍)
        • テスター募集中
    • データエンジニアがやるべきことは Tensorflow のコードを Cloud ML Engine にサブミットすることだけ
      • そのあとトレーニング
    • 学習データや学習後のモデルの保存は GCS へ
    • 出来上がったモデルを ML Engine にデプロイすると Web API を通して使える
      • もちろんスケールする

Google Cloud Dataflow で実現するストリームおよびバッチデータ処理

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • Cloud Dataflow とは
    • ストリーム及びバッチの統合処理モデル
    • OSS になっている (Apache Beam)
    • オートスケール及び動的タスクシャーディング
    • BigQuery / Bigtable / GCS など GCP のエコシステムと容易に連携
  • データ処理の歴史
    • MapReduce : シンプルスケーラビリティ(2003)
    • FlumeJava : パイプライン・最適化・オーケストレーション(2010)
    • MillWheel : ストリーミング・低遅延(2013)
    • Cloud Dataflow : バッチ + ストリーム統合モデル・マネージド実行エンジン(2015)
  • Dataflow の特徴
    • マネージドサービス
      • フルマネージドの自動構成
      • 最適な実行パスになるようにグラフを最適化
        • ビジネスロジックとチューニングのためのコードが混じりがち
        • メンテナンス性が下がるし、プログラマはビジネスロジックに集中したい
      • ジョブ実行中のオートスケール
        • Dataflow のフェーズによって必要なリソースは変わってくる
      • 上部実行中の動的ワークリバランス
        • 遅いノードに引っ張られて全体が遅くなるので、時間のかかっているタスクを他のノードに分け与える
    • プログラミングモデル + SDK
      • The world beyond batch : Streaming 101
        • 現代はスピードが増し、データがますます増えてきた
        • データは本質的には次から次へ発生する
        • システム処理の都合上、教会を定義するというのが従来の手法(バッチ処理)
          • bounded data / unbounded data
        • これらの課題を処理するための新しいモデル
  • データは無限に発生し、処理に遅延が発生することもある
    • 8:00に発生したイベントが13:00に処理されるなど
    • Element-wise(要素ごと)の変換
    • ウインドウに分割しての集計
      • イベントが発生した時間と集計タイミングが合わない(ウインドウから取りこぼす)ことがあり得る
      • 遅れてきたデータをどう扱うか?という問題
  • イベントタイムと処理タイムという概念
    • イベントタイム: イベントが実際に発生した時間
    • 処理タイム: イベントがシステムによって確認された時間
  • Dataflow モデル4つの質問
    • What : 何を計算するのか
      • Element-wise
      • Aggregating
      • Composition
    • Where : どこでデータを区切るのか
      • Fixed window
      • Sliding window
      • Session window
    • When : いつ処理をするのか
      • Watermark
        • イベントタイムがどこまで処理されたかを示すタイムポイント
        • ヒューリスティックに設定する(ウォーターマークよりも古いデータは現れない)
          • 早すぎるといくつかのデータを取り損ねる
          • 遅すぎると結果の出力が遅れる
        • withEarlyFirings と withLateFirings で調整ができる(on-time / early / late)
    • How : どのようにデータを補正するのか
      • 遅れてきたデータを足したり、無視したり、あるいは正規のデータとしたりできる
  • クラシックバッチ / ウィンドウバッチ / ストリーミング / ストリーミング + 補正 の4パターンが表現可能
  • 事例紹介 : 仮想通貨 c0banを用いた動画広告サービス
    • 広告ダッシュボード (動画表示回数 / 完全視聴数 / ユーザ数 / 評価)
      • Firebase -> BigQuery を最初考えたが直接ダッシュボードから参照するようにするとコスパが悪い
      • フロントに App Engine : Pub/Sub へのアクセスを API 化するために置いた
      • Cloud SQL : アプリ用のサマリデータを保存
      • BigQuery : 社内向けの全件データを保持
    • Dataflow モデル
      • 何を : Impression 数:コンテンツとアクションでキーづけられた整数の合計
      • どこで : 1時間のイベント時間の固定ウィンドウ内
      • いつ (の時点で結果を実体化するか) : 1分ごとに更新し、2時間の遅延まで認める
      • どのように(結果の更新を扱うか) : 遅れてきたイベントも加算する
    • 全てのデータは Pub/Sub でやってきてそれを途中で2経路のフローに分岐させて処理
      • 一つはダッシュボード用に集計値としてサマるフロー
      • もう一つは BigQuery 用にそのまま突っ込むフロー
    • 設計したパイプラインはチューニングの必要がないくらい優秀だった
      • Pub/Sub に 8,000 リクエスト/秒(クオータ制限上限)を投げた
      • 1インスタンスから4インスタンスに自動スケールアウト
      • 3,200リクエスト/秒 -> 14,300リクエスト/秒にリニアにスケール
        • 参考値としてバルスのツイート 14.3万ツイート
  • Apache Beam 自体は Apache Spark を始め色々な実行環境で動作する
    • Beam = Batch + Streaming
    • Cloud DataFlow では Python SDK も動作する

BigQuery と Cloud Machine Learning : 大規模ニューラル ネットワークによる予測

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • BigQuery
    • 5,000 ~ 10,000 台のディスクで 1TB を1秒でスキャン
    • Google Cloud : The Datacenter as a Computer
      • データセンターを超並列計算機として扱える設計担っている
      • Jupiter : Google が開発したネットワークファブリック
        • 10G x 10万ポート = 1.2 Pbps
        • マイクロ秒のレイテンシでサーバを集約
    • 数百台のサーバとペタビットネットワークによる超並列処理クエリ
      • Datacenter as a computer だからこそ実現できる技術
    • データの民主化
      • 昔はアプリケーションエンジニアですら(DB が死ぬことがあるので)気軽に DB にクエリを投げさせてもらえなかった
      • BigQuery ではコスト上限さえかけておけば非効率なクエリもどんどん実行してOK
  • Google Cloud Datalab
    • Jupyter に GCP 周りのコマンドを統合しているので BQ のクエリもコード中に書いて呼び出せる
  • Queryit Smart : シグネチャによるスマートな分析
    • 特徴ベクトルによるドキュメントの類似検索
    • stack overflow の投稿 1000万件に対して、正規表現マッチングを使った no index での比較が18秒で実行可能
    • 何をやっているのか
      • 単語に分割
        • 前処理も UDF で行なっていて、これも BigQuery を使うと速い
      • シグネチャ(特徴ベクトル)を抽出
        • tf-idf で重要そうな単語を抽出、数秒でできる
      • シグネチャ間の類似度を計算する UDF を定義
        • コサイン類似度を計算
        • SQL では書きにくいところを UDF(JS) で記述
      • UDF を使って類似のドキュメントを検索
  • ML Engine でスマートな分析
    • 人の感覚や直感を SQL で表現するのは難しい
    • HyperTune というパラメータチューニング支援もある
    • BigQuery を使った画像検索の例
      • Wikimedia の画像100万点をGCSにインポート
      • ML Engine で CNN(VGG16) を適用
      • ラベル分類をしてしまう直前(最終FC層)の特徴量をそのまま突っ込んで類似度をUDFで比較
  • レンタルサイクルの需要予測
    • UDF で多層パーセプトロンを定義し、GCS に保存したモデル(重みとバイアス)を読み込んでクエリで使うことまでできる
    • GCS から読めるデータサイズは 5MB まで
    • UDF にしてしまえば誰もが機械学習のモデルをクエリから利用することができる
  • Queryit Smart はソースコードを GitHub から入手できる

BigQuery の先進機能 : クラウド データウェアハウスの未来を開く鍵

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • Google BigQuery Analytics の著者
  • クエリが遅い場合があることに気づいても(ブラックボックスなので)改善方法がわからない
  • 実際のクエリ実行やクエリプランを通してアーキテクチャを実践的に解説
  • Dremel のアーキテクチャ
    • 発表当時は static tree だったが今は dynamic tree になっている
    • Colossus というカラム指向の分散ストレージを使っている
  • UI に表示されるクエリプランがチューニングにおいて有益
    • SELECT COUNT() の例では partial count が shard ノードで先に行われてそのあと sum される
    • 実行計画でも2ステージに分かれていることが確認できる
    • Stage timing に wait / read /compute / write が表示されている
    • 分散実行なのでノード間の平均実行時間や最大実行時間がどれだけ違うかを確認できる
    • Input / Output rows が表示されていてどれだけのスロットを消費したかがわかる
      • Scheduler がリソース割り当てを行う
  • BigQuery におけるスロット
    • スケジューリング対象となる BigQuery の並列実行可能な作業単位
    • スループットが指標
    • ジョブの最中に不要になったらキャンセル、また遅延やエラーの場合はリスタートされる
    • デフォルトでは1プロジェクトについて 2,000 スロットが割り当てられる
    • プロジェクト間・クエリ間でそれぞれフェアスケジューリングされる
    • さらにクエリのステージごとにスケジューリングされ、特定のジョブに対するスロット数を知るのはデータとクエリの複雑度に依存するため難しい
  • シャッフル
    • カーディナリティの高いデータの集計においてシャッフルで近い値を同じノードに格納するようにすることで中間レイヤーの並列度を上げる
      • 中間レイヤーの集計対象を区切る
      • ORDER BY + LIMIT の場合は中間レイヤーの中で件数を絞れる(N件以上返す必要がない)ためスケーラブル
    • クエリプラン上では WRITE TO __SHUFFLE / BY HASH のように何を key / value としてシャッフルが起きているか確認できる
  • マルチシャッフル
    • GROUP BY のキーが複数になると HASH キーが複数になる
    • Stage 1 の compute が減る一方で wait する時間が長くなる
      • Stage 2 の compute は当然増える
  • シャッフルの tips
    • ステージ N から N+1 へのマッピングが統計的に決定できない場合必ずシャッフルが行われる
    • シャッフルは(Jupiter や Colossus などの恩恵を受けて)超高速に行われる
    • シャッフルには quota が設定されており、そのサイズ?を超えるとディスクに書き出される
    • quota は追加プランで上限を上げることができる
  • シャッフルが実現する大規模 JOIN
    • JOIN の際も JOIN のキーでまとめ上げるためにシャッフルが起きる
    • JOIN 対象のテーブルそれぞれ個別にシャッフルがまず起きる
      • クエリプランには shuffle は見当たらない?
    • Output を見ると最後の JOIN ステップ時点ですでに多数のスロットが(不要になって)キャンセルされているのがわかる
  • ブロードキャストによる小規模 JOIN
    • JOIN 対象のテーブルどちらか一方が十分に小さい場合、シャッフルすることなく各 shard のメモリに全件コピーして結合ができる
    • WHERE 句による絞り込みでテーブルを小さくする
    • シャッフルが起きないので速い
  • 再パーティショニング
    • 100億レコードもの GROUP BY / ORDER BY になるとシャッフル後に再度必要な shard の割り当てが起きる
  • 偏ったデータの JOIN
    • 特定の shard に大量のデータが行ってしまい、平均実行時間と最大実行時間の乖離が大きくなる
    • 両方のテーブルの再シャッフルが必要なので再ディスパッチが適用できない
    • 少しトリッキーだがクエリを分割して、一つでヘビーキー(偏りの激しいところ)をやってもう一方でその他を処理する方法がある
  • 大規模な ORDER BY
    • ただの ORDER BY は最終的にマスタノードが全ての値を保持して並び替える必要がある
    • LIMIT 句を足すことで、下位の shard による処理の時点で LIMIT 数以上のデータは drop できる
      • Stage 1 の output が激減した
  • Resource exceeded 問題の解決策
    • 大きすぎるデータを生成してしまう場合 -> クエリを修正する
      • ARRAY_AGG / STRING_AGG
      • PARTITION なしの分析関数
      • CROSS JOIN(容易に爆発する)
    • 巨大なデータの並び替え -> LIMIT を追加する
    • データが偏っていて特定の shard が過負荷になる場合 -> クエリを分割する
  • 様々なカウント
    • GROUP BY + COUNT(*)
      • 遅いし複雑なクエリに組み込むのは難しい
      • 正確な値を返す
    • COUNT(DISTINCT x)
      • 遅いがスケーラブル
      • 標準SQLでは正確な値を返すようになった(それまでは概算値だったので問い合わせが多かった)
    • APPROX_COUNT_DISTINCT(x)
      • とても速い
      • 概算値だがエラーレートは 0.3% ~ 1% 程度
      • HyperLogLog の Google による拡張である HyperLogLog++ で実装されている
  • 複雑なクエリの最適化
    • WHERE 句で絞り込むキーの対象テーブルを入れ替えるだけでクエリが高速化するデモを実演
      • JOIN ON a.xxx = b.xxx で、a.xxx を正規表現マッチングで絞り込んでいたものを b.xxx に対象を変更

フルサイクルのデータ分析を実現!〜 GCP のサーバーレス・データ分析基盤を活用したデータサイエンスの実践

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • Cloud Pub/Sub をデータバッファとして利用
    • 下流への流量調整ができる
  • 移動平均などをリアルタイムで Dataflow で計算する
  • BigQuery に適宜データがストリーミングインサートされるのでバッチ処理を待たなくてもクエリを投げられる
  • PubSub に結果を再度書き出して他のシステムへの連携にも使える
    • Firebase の realtime database
  • 一昔前は Google もルールベースの検索をやっていた
    • Giants の検索結果を場所で変えるなど
    • 今は RankBrain による機械学習ベースに移行された
  • 空調 On/Off した時のコストを予測して、トータルコストを下げるのがゴール
    • Dataflow でルールベースの処理はできる、が閾値をどうすれば良いかわからない -> 機械学習にかけて算出
    • Dataflow 内の閾値をチェックしていたコードを InferenceParDo を呼ぶように返るだけで CluodML の REST API を叩くことができ、予測結果を取得することができる

Google Cloud Next '17 in Tokyo の機械学習系セッション聴講メモ(Day 1) #googlenext17

f:id:beniyama:20170614221800j:plain

ザ・プリンス パークタワー東京で開催中の Google Cloud Next ‘17 in Tokyo に参加しています。

cloudnext.withgoogle.com

今回は機械学習周りのセッションを集中して選んでみましたのでそのメモです(裏番組になっていて残念ながら聴講できなかったセッションも多数)。ほとんど殴り書き&一部不確かな記述があるかもしれませんがご容赦ください。


事例から学ぶ機械学習のいま ~ 専門家不要、自社独自の機械学習サービスを構築できる MAGELLAN BLOCKS の事例を元に、その方法と今起きている変革をご紹介します。

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • 機械学習サービスを作るためには業務知識と機械学習の知識が必要
    • 専門家に頼むと精度が出てもその後どうしたら良いかよくわからない
    • 人の考えるをもっと自由に、誰もが使える機械学習に
      • 業務知識とデータがあればそれで十分という世界を目指す
  • 基本的な機能
    • データを集める : IoT ボード
    • 学習させる : Machine Learning ボード
    • ブロックを繋いで結果を導く : BigData ボード
  • ブロックをつなげてバッチ系の処理とかも表現できる
  • フローに名前をつけて外部からも呼び出せる
    • API ブロックで外部のプログラムからも呼び出せる
    • 結果を Salesforce とかにセットできる
  • 過去のデータをブロックに見せると予測を始める
    • 数値データだと1,000件(3年分)もあれば精度は出る
  • ML ボード : 数値回帰, 数値分類, 画像分類
  • 電波強度をラズパイで計測して場所による人の密度を予測する回遊予測のデモ実演中
    • 結果は緯度経度の二次元で出る
  • 隠れ層の数や Learning モデルなどのハイパーパラメータの調整のために中と外の二段構えの機械学習
    • トレーニングの最大施工数なども指定できる
  • 画像分類の場合はカテゴリ名のフォルダの下に画像を入れるだけでラベルが振られる
  • 事例紹介
    • カイロの売上予測
      • N日前の気温などの条件も一緒に入れて時系列の変化も考慮して予測した
      • 1店舗あたり1個程度の予測の差の精度で出た
    • J リーグ観客動員数の予測
      • 2012 - 2014年前半戦までのデータを学習して2014年後半を予測
      • 少ない因子でかなり精度が高くでた
        • 曜日やキックオフの時間などだけでわかるということは、スタジアムまで来るファンが固定化されてしまっているということ
    • 株価予測
      • 関連因子を元に翌日の株価を予測した結果、500円台の銘柄で誤差1.2円
      • クライアントはドメイン知識があるのでそのクライアントにしかできない
    • 節電効果予測
      • 節電装置設置前に節電効果を予測
      • 誤差0.03%
    • 契約予測
      • 金融商品のDMを送る前に契約しそうな人を予測
      • 96%の予測を達成
  • Google Assistant (api.ai) 連携
    • 会話する BLOOKS
    • どの項目があれば質問が確定するかを設定すると、足りない情報を会話で聞くことができる

Google におけるディープラーニングの活用と Google Cloud Platform の機械学習サービス

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • Google が社内で使っている機械学習とディープラーニングの紹介
    • @enakai00 さん
    • 一昔前の Google のコンセプトであった One click で〜 というのはもう古い
      • 音声入力、キーワードなしのクエリ(Search result query)の時代
  • メディアで騒がれている AI は技術の話ではない
    • 知性を持っているかのような機能を提供する製品・サービス
    • いずれにせよ予測の機能が重要なのは間違いない
      • 従来型の機械学習がまさにそれ
      • ディープラーニングと機械学習の違い : 非構造型のデータに高い予測性能を発揮する
  • 事例紹介
    • DeepMind の音声合成技術(WaveNet)
      • Dilated Causal Convolutional Neural Network を用いてデジタル音声データを0から作り上げる
      • 音のつなぎ合わせではない
      • 人間のアナウンサーの声に次いで次に自然だと評価を得ている
        • 人間の声は体調的なコンディションにも左右されるので、将来的に合成音で安定的なアナウンスがされるようになるかもしれない
      • 声質と色々なテキスト文の組み合わせを作ることができる
        • 異種の言語を足し合わせてSFに出てくるような実在しない言語も生成できる
    • Gmail Smart Reply
      • メールの返答文を(3択くらい?)勝手に生成してくれる
      • スマホの Gmail の2割で既に使われている
    • データセンターの冷却設備の動作を改善
      • (Google のエンジニアがチューニングしていた状態から)DeepMind のチームが深層学習を適用したところ冷却コストが40%低下
      • データセンター電力効率(PUE)が15%改善
  • GCP が提供する機械学習サービス
    • 独自モデルの学習
      • TensorFlow
      • Cloud Machine Learning Engine
    • 学習済みモデルの API サービス
      • Cloud Vision API
        • 写真の中のテキストを認識
        • 画像アップロードサービスで不適切かどうかの判別を自動化できる
      • Cloud Speech API
      • Cloud Jobs API
      • Cloud Video Intelligence API
        • まだベータ版だが、何が動画に現れているかを認識できる
        • Shot 分析(シーン別分析)もできる(動画内の情報検索に活用できる)
      • Cloud Natural Language API
        • ocado (イギリスのオンラインスーパーマーケット)
          • クレーム対応などの CS 業務を自然言語分析にかけて感情判定をしている
          • ようやく実用レベルに達したという評価を得ることができた
  • CyberAgent アドテク本部 AI Lab が Vision API を活用
    • 広告代理店でのデータ分析
    • Versus
      • 次世代ブランド戦略室 x AI Lab
      • ブランドリフトに寄与するクリエイティブの構成要素を分析して動画制作の意思決定をサポートするサービス
      • 動画・画像広告の持つ高次元な要素を効率的に抽出するために Cloud Vision API を使用
    • 導入以前は Youtube Brand Lift Survey を使って、配信した動画広告について数百種類の要素を人出でタグ付けしていた
    • 導入後は Cloud Vision API とのハイブリッドになり、人手でしか難しいタグ付けを手作業で処理
  • Cloud Machine Learning Engine
    • 事例紹介
      • キューピー x ブレインパッド
        • 製造ライン上の不良品を画像から検出
      • オークネット x ブレインパッド
        • 顧客が撮影した写真から自動車の車種・製造年を検出するシステム
      • アクサ損害保険
        • ニューラルネットの活用で重大事故を起こすドライバーの予測精度を78%に向上
        • 従来型の機械学習は今までもやっていたはずだがニューラルネットで予想以上に予測精度を向上できた
          • 構造化データにも効果を発揮したという事例にもなった
  • フルサイクルの機械学習プラットフォーム
    • GCP のサーバレス・データ分析基盤を活用したデータサイエンスの実践のセッションが明日開催される

No-Ops で大量データ処理基盤を簡単に構築する - Google Cloud Platform で実現する次世代データ処理基盤

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • Big Data + No-Ops
    • Google では Big Data とは言わない。ただのデータ。
  • Google には世界に10億人のユーザを抱えるサービスが8つある
  • Flume Java と Millwheel は Dataflow として公開されている
  • 基盤を作るとメンテナンスしなければいけないので No-Ops で分析に費やす時間を増やす
  • データ処理基盤の要素 : 収集 / 保存 / 処理・分析 / 可視化
  • Cloud 2.0 と 3.0
    • Cloud 2.0 はパブリッククラウドの上に OSS を入れて自分で運用する
    • Cloud 3.0 は No-Ops
      • Cloud Spanner : NewSQL (OLTP において今までにない DB)
  • まず BigQuery から始める
    • GCP に手をつけるときに一番やりやすい、Google の特徴的なサービス
    • フルマネージドの No-Ops データウェアハウス
      • 1PBのデータに対して秒間5TBのスループットで処理が可能
      • マルチゾーン・マルチリージョン
  • データソース
    • バッチロード
      • Cloud Storage Transfer Service
      • gsutil
    • ストリーム API
    • Google Analytics 360 suite
    • Firebase
    • Google Stackdriver
      • システムログ
      • アプリケーションログ
    • BigQuery Data Transfer Service
      • まだ Beta
      • DoubleClick や Adwords などの広告系サービスのデータをインポート可能
  • 可視化
    • Cloud Datalab
    • Data Studio
  • データ処理エンジンとしての BigQuery

    • 構造データ
      • スキーマ定義されている
      • 文字列を SQL や UDF で分解することは可能
    • バッチ処理
      • クエリ処理のトリガーは API でキックできる
    • SQL
      • UDF で JS を記述できるが本質的には SQL エンジン
  • Cloud Dataflow

    • ストリーム及びバッチの統合処理モデル
    • OSS 実装もある(Apache Beam)
    • 新たなデフォルト (MapReduce / FlumeJava / MillWheel が合わさって DataFlow を構成する)
  • Cloud Dataproc

    • Hadoop, Spark のマネージドサービス
    • BigQuery / Bigtable / Cloud Storage のコネクタ
    • 必要なときに必要なだけ立ち上げるクラスタ
    • 高速(90秒でクラスタが起動)
    • 低コスト(分単位課金、プリエンティブル(最長24時間までしか動かない)VM)
  • Cloud PubSub

    • スケーラブルで信頼性の高いメッセージングミドルウェア
    • 多様な配信方式
    • Push 配信及び Pull 配信
    • グローバルリソース
  • リファレンスアーキテクチャ

    • Cloud Dataflow はバッチでもストリームでも前処理できる
    • No-Ops で自動的にスケールできる
  • リクルートテクノロジーズの事例紹介

    • オンプレミス Hadoop 基盤の活用と課題
      • スタック : Sqoop / Hbase / Impala / Hive / Hadoop / Python
      • 活用シーン
        • リクルートの Web サービスへのレコメンド機能の提供
        • BI/分析
      • 課題
        • メンテナンスコスト
          • Hadoop のバージョンアップ対応
            • 既存ジョブのテストやその環境コスト(3万クエリのテストが必要)
            • エコシステム間の依存関係で影響範囲が大きくなることも
              • バージョンアップして Impala の関数を使いたいがために Hive も上げる必要があったりする
              • インフラ運用の4割くらいの業務ボリュームを割いてしまっている
          • パッチ適用対応
            • 新規クエリや従来クエリのデータ量や質の変化で未知のバグを踏むこともある
        • スケーラビリティ
          • 拡張リードアイム
            • ノード追加やクラスタ増設
            • サーバやラックの設置で2、3ヶ月かかることもある
              • 用意さえされれば provisioning は自動化されているが、そもそも突然要求リソースが増えたりする
          • 機械学習処理の拡張性
            • Python の機械学習処理の複数サーバ分散の手動運用
              • 主に多数のユーザやコンテンツに対する predict() が処理増大
      • Ops ばかりやっているが、本当であればデータ活用の方に回したい
    • GCP 活用による課題解決
      • ハイブリッド No-Ops クラウドサービス
        • フラットな視点で最適なクラウドサービスを選択
        • 以降コストが見合わないなどの環境はオンプレミス
          • Hive + Impala -> BigQuery
          • BigQuery と DataFlow 中心の構成に
      • なぜ BigQuey なのか
        • バージョンアップなどの大幅な仕様変更がなさそう。標準SQLで開発可能にもなった。
        • データサイエンティストが使いやすくて何より速い
          • Hive で1時間の処理が BigQuery で10分
          • 本来処理したくて諦めていた5倍のデータ処理もいけるようになった
      • なぜ DataFlow なのか
        • 分散処理インフラの運用がほぼ不要
          • リソースの起動・停止・拡張縮小・リソース使用量やログ収集管理など
        • predict() のところを並列処理できる
        • Python が GA になった
    • Hive -> BigQuery の書き換えはノウハウをパターン化
      • Hive にしかない機能例 : Dynamic partition / STRUCT / LATERAL VIEW
      • MapReduce や UDF は幸いほとんどクエリ化できた
    • BigQuery の課題
      • データの移行
      • 大きめの CROSS JOIN
    • Dataflow の課題
      • データ量が増えた場合の拡張をどのように自動化するか
      • 現在は worker 数を指定しているため要手動チューニング

Google のデータサイエンティストが語る現場で使える機械学習入門

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • 機械学習はこう動く
    • ルールベースによる分類との違い
      • 単純なルールで分けられないデータに弱い。例外が来るたびに書き直さなければいけない
      • 機械学習はデータに合わせて自動的に改善する
    • 自動的に縦横問わず線を引く(単純パーセプトロン)
    • 曲線や縁で最適な線を引き直すこともできる(SVM など)
    • 学習データをスキャンしていきながら分類器を学習させていき、テストデータで検証
      • 誤判定が発生したときにフィードバックを与えてモデルパラメータを変更する
  • 8つの機械学習のステップ

    • 機械学習を使うかどうかの判断
      • 例えばEC サイトのレコメンドシステムにディープラーニングを使う
        • 毎日働いてくれるし売り上げも上がっていれば中身がブラックボックスでも許せる
        • アプトプットが日次より頻繁というのも一つの判断要因
    • 目的:機械学習に何をさせるのか?
      • 解きたいパズルは何か : CPA の最小化、CVR の最大化など
    • データを集める
      • 十分な量のデータをできるだけ自動で集める
      • 本当に必要なデータか厳選する
    • データの前処理
      • 一般的にデータはそのままではただのゴミの山、きちんと分類整理しなければ役に立たない
      • 基本的にはどんなデータでも列指向に持ち直す必要がある
      • 前処理は全リソースの8割以上を費やす
    • モデル学習とその方法
      • 機械学習も万能ではない
    • モデルのチューニング
      • チューニングしてこそのアウトプット
    • 汎化性能
      • 過去データにぴったり合わせ過ぎると未知のデータに合わなくなる
      • ノイズに振り回されず真のシグナルにもっともよくフィットすることで未知データに対して高い精度を発揮する度合いが汎化性能
    • 検証
      • A/Bテスト、pre/post テストで効果を見る
      • もちろん ROI も重要なので全体感に注意
    • 改善サイクル
      • 消費者心理の変化などでかつて有効だった機械学習モデルも割と簡単に使い物にならなくなる
      • 改善サイクルを回し続けてこその機械学習
  • GCP x ML for business

    • 典型的な use case
      1. 新規見込みユーザ検出
      2. イメージ・ドキュメント認識・分類
      3. 2 を受けてのレコメンド
      4. 異常値検出
      5. 対戦ゲームの自動プレイ
    • BigQuery x TensorFlow
      • Datalab から BigQuery を呼ぶ
      • シャッフルして最初の N 件を学習に使う
      • tensorflow 1.2 が出てだいぶシンプルに記述できるようになった
        • DNNClassifier
        • hidden_units を空にすると単層パーセプトロン
          • MLP : multi layer perceptron
  • IDOM (ガリバー) の ML プロジェクト
    • GA 360 + CRM データ > BigQuery > Datalab
    • 来店確率を予測して店舗訪問の可能性の高い顧客に優先的にリーチした
  • RLS の CET (Capture Every Thing)
  • GMO Ad Marketing など(裏のセッション)

顧客関心をより高める AI(機械学習)と API の活用

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • AI or IA (Intelligence Amplification : 知能増幅)?
    • 自動運転 vs ブレーキアシスト
    • いずれにせよこれらを支えるのが機械学習
      • 人工知能:物事を賢くする技術
      • 機械学習:学べるコンピュータの作り方
      • ニューラルネットワーク:機械学習の1手法
  • 継続的な学習:強化学習
    • AlphaGO も AlphaGO 同士が戦って強化している
    • Street view の内部情報(看板の文字など)の認識にも使っている
    • ロボットがものを掴む動作を繰り返し行って学習していく
  • Google 翻訳もディープラーニングベースのものに変わった
  • AI x API による機械学習の民主化
    • 自分で作った AI と Google の AI を API でつなげる
    • それをさらに世に出して他の人に使ってもらうこともできる
  • AI は難解なアプリケーションのためだけにあるわけではない
    • 全てのアプリケーションは将来的に学習するようになる
    • 知性を全てのアプリケーションと API に組み込まなくてはいけない
  • APigee
    • いろんなサービスを API 化することを助けるサービス
      • ボットからの攻撃(呼び出し)を防御してくれたりする
    • 新しい API(エンゲージメント)層
    • 利用するサービスの API 化だけでなく作り出した機能の提供のための API 生成にも使える
  • rMark Bio 社
    • サイロ化されたヘルスケアデータへの API からのアクセスを実現
  • デモ
    • 300行のJSのコードでチャットのデモが作れる
    • Firebase でログインなどの機能も実現
    • 会話応答だけでなく Translation API を使って翻訳をしたり Vision API を使って OCR 的な認識による画像からの文字起こしもできる
  • 技術は進化するほどユーザ視点では簡単になっていく
    • ユーザ企業は顧客と組織にフォーカスしていくことができる

API.AI と Cloud Speech、チャットボットで実現する、会話型ユーザー エクスペリエンス

URL : Google Cloud Next '17 in Tokyo | 6 月 14 日 ( 水 ) ・ 15 日 ( 木 ) | スケジュール

  • conversation API と speech recognition についてのお話
  • api.ai は Google に買収された会社
    • 自然言語を構造化データに変換する
    • Cloud function と組み合わせると強力
  • Slack や LINE とも連携できるのでそれぞれのチャットボットを開発可能
  • Agent をまず作る
    • そして Entity (Synonym を適当にいくつか入れて単語の表現に幅を持たせられる)
  • Intent : 対話制御
    • Entity を勝手に判断してくれる
    • 意味的に近い文章は(機械学習で)同じ文章として寄せてくれる
      • 離れたやつは Training から history を開いて自分でチューニングできる
  • プロンプトを出して質問の確定に足りない情報を聞くことができる
  • 結果はJSON で吐き出せる
  • Context を使って複数の応答を行う会話を生成可能
  • 音声の文字起こしデモ
    • meat と meet も文脈を判断して正しく書き分けられる
    • 感情分析の API もあるのでその言葉がどれくらいポジティブ or ネガティブか判断可能
      • つまらないセッションすぎて終わるまで待てない -> ネガティブなスコア
      • このセッションすごい楽しい、特にデモ -> ポジティブなスコア
  • api.ai も Speech Recognition API も今日から公式に日本語対応

最近の登壇記録

備忘録的に書いとく。

第3回スタートアップRails勉強会

「データ基盤で超える開発の壁」

connpass.com

第2回 教育AI・ビッグデータ分析WGセミナー

「スタディサプリを支えるデータ分析基盤 ~設計の勘所と利活用事例~」

lebac.jp

Food&Drink meetup include matz #5 ~怠惰な働き方を支える技術~

「13段クエリを怠惰に走らせる」

rmp-quipper.connpass.com

Treasure Data でクエリを書く時に真っ先に頭に浮かべたい UDF、それが TD_TIME_RANGE()

Treasure Data での時間の範囲指定にはとにかく TD_TIME_RANGE() を使おう

この記事で言いたいことはこれにつきます。

Treasure Data での唯一のパーティションキーは time だけ

Treasure Data ではユーザがインデックスを作成したりパーティションキーを指定することはできず、あるのは time カラムによるパーティションだけです。

例えば、

SELECT 
  time, 
  td_client_id
FROM pageviews
ORDER BY time ASC

はもちろんフルスキャンですが、ここで 2017-03-01 以降のログが欲しい!というとき

SELECT 
time, 
td_client_id
FROM pageviews 
WHERE '2017-03-01' < to_iso8601(from_unixtime(time))
ORDER BY time ASC

とかすれば OK… ではありません。

このクエリを打つと TD の実行ログのところに

started at 2017-03-13T17:04:38Z
executing query: SELECT 
time, 
td_client_id
FROM pageviews 
WHERE '2017-03-01' < to_iso8601(from_unixtime(time))
ORDER BY time ASC
**
** WARNING: time index filtering is not set on pageviews
** This query could be very slow as a result.
** Please see https://docs.treasuredata.com/articles/presto-performance-tuning#leveraging-time-based-partitioning
**
Query plan:
- Stage-0
    Partitioning: SINGLE
    PartitionFunction: SINGLE
    -> Output[7]
        Columns: time = time:bigint, td_client_id = td_client_id:varchar
        -> Sort[3]
            OrderBy: time ASC NULLS LAST
            -> RemoteSource[13]
                Sources: Stage-1
- Stage-1
    Partitioning: SOURCE
    PartitionFunction: UNKNOWN
    -> Project[12]
        Assignments: 
        -> Filter[11]
            Condition: ('2017-03-01' < "to_iso8601"("from_unixtime"(CAST("time" AS DOUBLE))))
            -> TableScan[0]
                Table: pageviews
                Columns: td_client_id:varchar = td_client_id:"td_client_id",

などと出ると思います。

注目すべきは

WARNING: time index filtering is not set on pageviews This query could be very slow as a result. ** Please see https://docs.treasuredata.com/articles/presto-performance-tuning#leveraging-time-based-partitioning

のところで、お前のクエリ(フルスキャンで)くそ重くなるからこのURL見て出直してこいって親切にURLまで書いてあります。

ここでこのクエリを TD_TIME_RANGE() を使って

SELECT 
time, 
td_client_id
FROM pageviews 
WHERE TD_TIME_RANGE(time, '2017-03-01', NULL, 'JST')
ORDER BY time ASC

などと書くと

started at 2017-03-13T17:07:08Z
executing query: SELECT 
time, 
td_client_id
FROM pageviews 
WHERE TD_TIME_RANGE(time, '2017-03-01', NULL, 'JST')
ORDER BY time ASC
Query plan:
- Stage-0
    Partitioning: SINGLE
    PartitionFunction: SINGLE
    -> Output[7]
        Columns: time = time:bigint, td_client_id = td_client_id:varchar
        -> Sort[3]
            OrderBy: time ASC NULLS LAST
            -> RemoteSource[13]
                Sources: Stage-1
- Stage-1
    Partitioning: SOURCE
    PartitionFunction: UNKNOWN
    -> Project[12]
        Assignments: 
        -> Filter[11]
            Condition: ("time" BETWEEN BIGINT '1488294000' AND 9223372036854775806)
            -> TableScan[0]
                Table: pageviews
                Columns: td_client_id:varchar = td_client_id:"td_client_id", time:bigint = time:"time"
                ** Time indexes:
                    Time index: [2017-02-28 15:00:00 UTC, 9999-12-31 23:59:59 UTC]

となって警告が消え、

** Time indexes:
   Time index: [2017-02-28 15:00:00 UTC, 9999-12-31 23:59:59 UTC]

などと確かに time index が効いていそうな感じが醸し出されています。

手元の環境でこの実行速度を比べると 45sec だった実行時間が 28sec (およそ1.6倍の性能向上)になりました

TD_TIME_RANGE() を使うべし。BETWEEN も要注意!

Treasure Data を利用する上で最も簡単でかつ有効なアドバイスは、「時間を扱うときは必ず UDF を使え」というものです。

上の警告メッセージにもありますが https://docs.treasuredata.com/articles/performance-tuning を読むといろんな注意点があることがわかります。

例えば先頭にある 1) WHERE time <=> Integer ですが

1) WHERE time <=> Integer

When the ‘time’ field within the WHERE clause is specified, the query parser will automatically detect which partition(s) should be processed.

とあり、time をWHERE句に入れると自動的にパーティションを絞ってくれるように書いてありますが

Please note that this auto detection will not work if you specify the time with float instead of int.

とあるように int 以外の型で指定すると無効になる、というのがわかります。

例えば

SELECT field1, field2, field3 FROM tbl WHERE time > 1349393020
SELECT field1, field2, field3 FROM tbl WHERE time > 1349393020 + 3600
SELECT field1, field2, field3 FROM tbl WHERE time > 1349393020 - 3600

は GOOD ですが

SELECT field1, field2, field3 FROM tbl WHERE time > 13493930200 / 10 
SELECT field1, field2, field3 FROM tbl WHERE time > 1349393020.00

は BAD です。直接 float で比較するケースは余りない気がしますが除算をしてしまうとダメだということです。当然前述の文字列比較もダメなわけです。

さらには BETWEEN、これも BAD でした。

SELECT field1, field2, field3 FROM tbl WHERE time BETWEEN 1349392000 AND 1349394000

普通にSQL書く感覚だとつい書いてしまうと思います。

これに対して TD_TIME_RANGE() は int/long/string を受け付けるので

TD_TIME_RANGE(time, '2017-03-01', '2017-03-10', 'JST')

みたいな書き方が可能です(BETWEEN とは異なり終端期間を含まないことに注意)。しかもタイムゾーンも簡単に指定できます。

つまり、何も考えずに TD_TIME_RANGE() を使うのが最も間違い無いのです。

TD には便利な時間系の UDF が揃っていますのでDATE系の標準SQLの関数を使う前にまず

docs.treasuredata.com

をご覧になることをお勧めします。

補足:TD_TIME_RANGE() と TD_TIME_FORMAT() を併用するとダメなのか?

上のドキュメントによると TD_TIME_RANGE()TD_TIME_FORMAT() を突っ込むと time index が無効化されると書いてあります。

However, if you use TD_TIME_FORMAT UDF or division in TD_TIME_RANGE, time partition opimization doesn’t work. For instance, the following conditions disable optimization.

また、

blog-jp.treasuredata.com

にも

ここで注意すべきは「月初から現在まで」といった start_time に 2014-09-01 などの文字列として設定したい場合です。TD_TIME_FORMAT については後述しますが,以下は time index pushdown が利用できない例です:

– 月初から現時点までのレコードを抽出する(time index pushdown が効かない例)

SELECT … WHERE TD_TIME_RANGE(time, TD_TIME_FORMAT(TD_SCHEDULED_TIME(), ‘yyyy-MM-01’), TD_SCHEDULED_TIME())

現時点では TD_TIME_FORMAT で動的に文字列を生成すると pushdown が効かなくなる仕様になっています。

と書いてあります。

しかしこれを実行してみると

started at 2017-03-11T18:02:34Z
executing query: SELECT TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST')
FROM pageviews 
WHERE TD_TIME_RANGE(time, TD_TIME_FORMAT(TD_SCHEDULED_TIME(), 'yyyy-MM-01'), TD_SCHEDULED_TIME(), 'JST')
ORDER BY time ASC
Query plan:
- Stage-0
    Partitioning: SINGLE
    PartitionFunction: SINGLE
    -> Output[7]
        Columns: _col0 = td_time_format:varchar
        -> Project[14]
            Assignments: 
            -> Sort[3]
                OrderBy: time ASC NULLS LAST
                -> RemoteSource[13]
                    Sources: Stage-1
- Stage-1
    Partitioning: SOURCE
    PartitionFunction: UNKNOWN
    -> Project[12]
        Assignments: td_time_format:varchar = "td_time_format"("time", CAST('yyyy-MM-dd' AS VARCHAR), CAST('JST' AS VARCHAR))
        -> Filter[11]
            Condition: ("time" BETWEEN BIGINT '1488294000' AND BIGINT '1489287599')
            -> TableScan[0]
                Table: pageviews
                Columns: time:bigint = time:"time"
                ** Time indexes:
                    Time index: [2017-02-28 15:00:00 UTC, 2017-03-12 02:59:59 UTC]

となりちゃんと指定されたパーティションを見に行っているように見えます。

Presto だからかな?と思って Hive で投げても

Hive history file=/mnt/hive/tmp/5397/hive_job_log_c1869776-51fb-4138-8a6f-141dfb872efd_955348095.txt
**
** Time indices:
**    Time index: [2017-02-28 15:00:00 +0000, 2017-03-12 02:59:59 +0000]
**

となっていてドキュメントが書かれた時分よりクエリ最適化が進化して施されている様子でした。

気になって Treasure Data に問い合わせたところ、TD_ 系の UDF はすでに time index pushdown に対応していて、ドキュメントも近く更新されるということでした。これで安心して TD_TIME/DATE_* が使えますね。

デブサミ 2017 での講演の感想など #devsumi #devsumiB

f:id:beniyama:20170220223816j:plain

大変ありがたいことに 2/17(金)にデブサミにて登壇する機会をいただいたので、目黒の雅叙園に行ってまいりました。

event.shoeisha.jp

思えば去年はまさにこのスタディサプリ移管の真っ最中で公募枠に応募することすらできず、今回は溜まりに溜まったネタを丸ごとぶつけました。

そのせいもあってスライドもモリモリの84ページ。

こんな感じになりました。

まだまだデータの組織としては小さいですし、データの文化が根付いているかと言われるとそんなことないですし、機械学習みたいな先端のデータ活用ブンブンしてるかと言われると全くこれからなんですが、データの組織を立ち上げたばかり、あるいはこれから立ち上げたい人や企業にとって等身大の現場のお話はできたのではないかなと思っています。

実際、Ask the speaker でもデータの組織について悩みを共感していただける方に複数お声がけいただき、やはりみなさん色々悪戦苦闘しているのだなと再認識しました。データの組織といってもサイエンティスト、アナリスト、エンジニア…と違う人格が入り混じってますし、もっと大きな組織であればそのロールごとに部署があったりするので一概には言えませんが、だいたい10~20名程度の組織規模に関して言えば現状の延長線で考えられる気がしています。

ちなみに講演中は全然気づかなかったのですが人生初のグラフィックレコーディングもしていただいていました!感激すぎる。

こうやって画像になっているとスライド探さなくても Twitter だけでどんな感じのセッションだったか感じがわかって良いですね。参加できなかったセッションのとかとてもありがたい。

そしてリハの最中に気付いたのですが今回登壇したB会場は2年前と同じ会場でした。前回は残席わずかとなっていたところ、今回は立ち見の方もちらほら見える感じでお越しいただいて感謝の気持ちでいっぱいです(Ask the speaker もぼっちにならなくて嬉しかった)

beniyama.hatenablog.jp

ちなみに本番前にこの記事を読み返していて

特に竹迫さんの『ドットコムバブルの再来~アセンブラ短歌を一句~』は素晴らしいの一言でした

と言及されている竹迫さんが今や自分の直属の上司になっているということに気づいたのでした。面白い縁だな〜と思うと同時に、何でもブログに書いておくもんだなと(当時面識なかったので書いておかなかったら気づくこともなかった)。

まだまだスタディサプリのデータ周りは色々やりたいこと、面白いこと、アイデア諸々たくさんあるのでまたどこかの機会でお話しできると良いな、と思います。

Digdag / Treasure Workflow でプラグインを使わずに `http:>` オペレータで Slack 通知を行う

ワークフローエンジンの Digdag を本格的に使い始めたのですが、バージョン 0.9.3 の現在、通知系のオペレータは mail:> くらいしかありません。

プラグインを使うことで Slack 通知が可能になりますが、Digdag のホスティングサービスである Treasure Workflow で同じことをしようとしてややハマったのでメモしておきます。

Digdag で Slack プラグインを使う

Digdag には digdag-slack という素敵プラグインがあるのですが、残念ながら最新の 0.9.3 では動作しないということで @bwtakacy さんが qiita.com という記事を書いてくれました。

その結果、

_export:
  plugin:
    repositories:
      - file://path/to/workspace/digdag-slack
    dependencies:
      - jp.techium.blog:digdag-slack:0.1.2

+step1:
  slack>: message.txt
  webhook: https://hooks.slack.com/services/XXXXXXXX  # <-- Slack Incoming WebHooks url
  channel: general
  username: webhookbot
  icon_emoji: ghost

という感じで動かすことができるようになったのですが、これを Treasure Workflow で動かそうとすると

2017-01-31 21:04:53.077 +0000 [INFO] (0589@+my_project+step1) io.digdag.core.agent.OperatorManager: slack>: message.txt
2017-01-31 21:04:53.077 +0000 [ERROR] (0589@+my_project+step1) io.digdag.core.agent.OperatorManager: Configuration error at task +my_project+step1: Unknown task type: slack (config)

などと怒られてしまいます。

@myui さんの

github.com

を参考に Jitpack に登録して

_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - com.github.myui:digdag-plugin-example:v0.1.2

などとしても結果は同じでした。

http:> オペレータで Webhook を叩くのが楽

結局、

+step1:
  http>: https://hooks.slack.com/services/XXXXXXXXXX  # <-- Slack Incoming WebHooks url
  method: POST
  content:
    text: "タスク1の通知です :tada:"
  content_format: json

などとして標準梱包の http:> オペレータを使うようにしてプラグインが使えない問題を回避しています。Digdag も Treasure Workflow も同じコードでいけるはずなので、これが現状一番手軽かもしれません。

Incoming Webhooks | Slack に記載されているような他のオプションも投げられるはずなので

+step1:
  http>: https://hooks.slack.com/services/XXXXXXXXXX  # <-- Slack Incoming WebHooks url
  method: POST
  content:
    username: "ghost-bot"
    icon_emoji: ":ghost:"
    channel: "#other-channel"
    text: "タスク1の通知です :tada:"
  content_format: json

というようにもできるはずです(未検証)。

(余談)0.9.3 で digdag push から -p パラメータがなくなった

digdag-slack プラグインの README では

_export:
  plugin:
    repositories:
      - file://${repository_path}
    dependencies:
      - jp.techium.blog:digdag-slack:0.1.0

として

$ digdag r hello_world.dig -p repository_path=/path/to/workspace/digdag-slack

というように repository のパスを -p で指定するようになっていますが、これを push しようとした時 digdag 0.9.3 では -p オプションが無くなっているためうまくいきません。

$ digdag push --help
2017-02-01 02:31:38 +0900: Digdag v0.9.3
Usage: digdag push <project> -r <revision>
  Options:
        --project DIR                use this directory as the project directory (default: current directory)
    -r, --revision REVISION          specific revision name instead of auto-generated UUID
        --schedule-from "yyyy-MM-dd HH:mm:ss Z"  start schedules from this time instead of current time
    -e, --endpoint HOST[:PORT]       HTTP endpoint (default: http://127.0.0.1:65432)
    -L, --log PATH                   output log messages to a file (default: -)
    -l, --log-level LEVEL            log level (error, warn, info, debug or trace)
    -X KEY=VALUE                     add a performance system config
    -c, --config PATH.properties     Configuration file (default: /Users/yamabe/.config/digdag/config)

結局上述したように file://path/to/workspace/digdag-slack というようなパスを埋め込むことにしたのですが、他に良いやり方がある気がします。

Embulk で任意のカラムをマスクする embulk-filter-mask プラグインを公開しました

久しぶりの投稿ですが生きております。

ふと Embulk のプラグインを見よう見まねで作ってみました。

github.com

Embulk で転送かけるデータのカラムを指定して * で置換を行うフィルタープラグインです。

センシティブな情報なのでマスクしたい、がカラムごと削ってしまうと入力有無がわからなくて困る…あるいは JSON の一部分だけ不要なんだけど他は連携する必要がある…などの時に使えるかなと思い作りました。

README にもある通り、例えば

first_name last_name gender age contact
Benjamin Bell male 30 bell.benjamin_dummy@example.com
Lucas Duncan male 20 lucas.duncan_dummy@example.com
Elizabeth May female 25 elizabeth.may_dummy@example.com
Christian Reid male 15 christian.reid_dummy@example.com
Amy Avery female 40 amy.avercy_dummy@example.com

というデータを input に食わせた時、

filters:
  - type: mask
    columns:
      - { name: last_name}
      - { name: age}
      - { name: contact, pattern: email, length: 5}

という設定を embulk の config に記述しておくと

first_name last_name gender age contact
Benjamin **** male ** *****@example.com
Lucas ****** male ** *****@example.com
Elizabeth *** female ** *****@example.com
Christian **** male ** *****@example.com
Amy ***** female ** *****@example.com

このようなデータが output に渡されます。

カラム名を指定しただけの状態では単純に文字数分の * に置換されますが、例えばlength オプションで5文字と指定することで一律 ***** とすることも可能です。文字長に意味を持たせたくないときなどによろしいかと思います。

あとは、pattern: email とするとメールアドレスのドメイン部分(@ 以降)は残したままでマスクを行います(デフォルトは pattern: all の全文字置換)。

JSON 型のサポートもしており、

{
  "full_name": {
    "first_name": "Benjamin",
    "last_name": "Bell"
  },
  "gender": "male",
  "age": 30,
  "email": "test_mail@example.com"
}

という構造のデータが入っている user というカラムがあった時、

filters:
  - type: mask
    columns:
      - { name: user, paths: [{key: $.full_name.first_name}, {key: $.email, pattern: email}]}    

などとすると $.full_name.first_name$.email の二つの JSONPath に合致するノードをマスクします。

またこの時、前述の patternlength オプションも個別に指定が可能なので、上記の例の出力は

{
  "full_name": {
    "first_name": "********",
    "last_name": "Bell"
  },
  "gender": "male",
  "age": 30,
  "email": "*********@example.com"
}

のようになります。

指定されたカラムの value が文字列以外の型だった場合は一回文字列に変換したものを置換します。例えば [0, 1, 2, 3] のような配列は ********* になります。

あるいは上の例で $.full_name などと指定すると

  {
    "first_name": "Benjamin",
    "last_name": "Bell"
  }

の部分が単なる文字列として解釈されてまるっと * になります。

だいぶ乱暴というか投げやりなのでこの辺は今後直して各ノードの value 部分だけをマスクして

  {
    "first_name": "********",
    "last_name": "****"
  }

などとできるようにしたいところです。

JSONPath については

github.com

を内部で使っていますので、そこで解釈できるパスであれば基本的に動作すると思います。

また今回プラグインの作り方、特にテストの書き方や JSON の扱い方などは

qiita.com

や、その中でも題材にされている embulk-filter-expand_json プラグイン

github.com

を参考にさせていただきました。ありがとうございました!