Deep Dive BigQuery
CyberAgentグループでサーバサイドエンジニアをしている 山口( @nao99999 ) です。
CyberAgentメディア管轄の広告プロダクト横断組織(PTA)によるアドベントカレンダー2021最終日の投稿になります。
CyberAgent PTA Advent Calendar
前日は同僚の古川による「Slack Workflow と Google Spreadsheet を合わせると超絶便利な件」でした。
今回は書籍や公式ドキュメントを元にBigQueryの内部構造に関して書きたいと思います。
Introduction
普段業務でBigQueryを使っていて、なんで大きなデータに対して早く結果を返せるか気になったので背景や内部構造などを調べてみました。最後に記載しますが主に下記をまとめたものになります。
References
What’s BigQuery
プロダクトが成長してデータ量が増えていく中で、リレーショナルデータベースで分析するのが困難になったそうです。その後MapReduceを開発したものの集計処理に時間がかかるため、インタラクティブに分析することができないという問題にあたったそうです。最終的にそういう課題のもとBigQueryの前身となるDremelの開発に繋がりました。
Background
DB設計における基本的な過失の一つがテーブルフルスキャンであるそうです。
オプティマイザがさじを投げて高速な方法を見つけることができなかった最後の手段としてテーブルフルスキャンが存在します。現代のDBの多くが何としてもこれを避ける設計となっています。(RDBでindexを貼ることや、NoSQLのアーキテクチャなど)
Dremelの開発者はなぜフルスキャンが低速になってしまうのか考え、1TB以上のテーブルを1秒以下でスキャンすることを目指したそうです。
標準的なハードディスクだと毎秒100MBを読みとることができるとすると、1TBだと3時間弱かかってしまいます。また1TBで1行256バイトだったら1コアで毎秒100万行になるので、テーブル全体を1秒で処理するためには4000コアが必要になってしまいます。
Googleはスケールアップではなく、スケールアウトなアーキテクチャでこの問題の解決をしたのがDremalになります。
(ここでスケールアップはハードの増強を指して、スケールアウトは水平スケールつまりマシンの追加を指します。)
Architecture of BigQuery
ここでBigQueryのサブセットを紹介していきます。
参照したレファレンスが古かったり新しかったりするので、今は違うかもしれませんが、考え方は変わってないはずです。
今回は主にストレージとクエリエンジンについて記述します。
1. Storage
ボトルネックはI/Oなので、それに対するアプローチになります。
- Colossus (並列分散ファイルシステム)
- Capacitor (ストレージフォーマット)
1.1 Colossus
Google File System(GFS)の後継システムです。Colossusに関しては論文などで公開されていないようです。
Colossusは分散ファイルシステムで、データが別々のサーバのディスクに保存されています。データを複数にチャンクしパーティショニングすることで、大量のディスクから並列に読むことが可能となり読み取りの高速化を図っています。
またどれかのサーバーがクラッシュする前提のもと、とあるサーバーが故障したらレプリケーションされた別のサーバから再取得するような仕組みもあるようです。レプリケーションは3箇所のリージョンにまたがってされます。
GCPのブログでも紹介されています。
1.2 Capacitor
Google独自の新しい列志向ファイルフォーマット(Capacitor)になります。
(Apache Parquetなどは過去のBigQueryの列志向フォーマットの論文を参考にして実装されたので近い構造の様です)
従来は行指向でデータが保存されてましたが、分析時に特定の列を読むのに行全体を読み込まなければなりません。また行レベルで繰り返されるデータが少ないために圧縮することも難しいです。
そこで列指向でデータを格納します。
列を個別に格納することで、クエリ時に行全体を読まずに特定の列のみを読み出すことを可能にします。例えば100列あるテーブルで3列だけ読み取る場合3%のみのスキャンで済みます。
列志向ストレージが浸透した理由として、分散ファイルシステムが出来たために別々のディスクから並列に読むことができるという側面もあるそうです。(2つの列を読み取る場合、1つのディスクから列指向のデータを読むとシーク処理が増えるため読み取りが低速になります。)
またストレージや帯域の最適化のために複数の圧縮方式と取っています。カーディナリティに依存しますが列単位の場合は同じ値が続くことが多いので圧縮効率が高くなります。
1つ目は辞書エンコーディング(Dictionary Encoding)です。カーディナリティが低い列に対して適用されるようです。オフセットと値の辞書作って列をエンコードします。カーディナリティが高い列だと辞書が肥大化して圧縮率が下がります。
2つ目はRun-Lengthエンコーディング(RLE)です。これは繰り返すデータに対して、繰り返す項目をまとめます。つまり、値「2」が5回続けて表示される場合、「2,2,2,2,2」を格納するのではなく、「2:5」を格納できます。同じ値が続けば続くほど圧縮率が高くなります。
しかし同じ値が複数回発生しないような場合は圧縮率がさがります。これを解決するためにCapacitorは行を並べ替えることでRLEを適用するそうです。この行をどう並び替えるかはわからないですが、BigQueryの行は順序付けられておらず、どの行が他の行の後に続くかについての保証がないそうです。これに関しては Capacitor のblog でも触れられています。
2. クエリエンジン
次にDremelの話になります。処理のフローにTree構造が取られています。
クエリを受け付けるルートのサーバーを「ミキサー」といい、データを読み込んだり集約するサーバーを「シャード」と言います。基本ルートサーバーでは計算を行わず、シャードサーバーで計算をさせることでスケールアウトに対応させていく方針になります。
いくつか例を見ていくことで挙動を見ていきます。イメージ的にはクエリが最適化のために修正されながら下のサーバーに処理が移譲されるイメージです。
2.1 単純なフィルタークエリ
- ミキサーがデータ量を調べて必要なファイル数を調べます。これは必要なシャード数になります。ミキサーはシャードに並列でリクエストを送ります。
- 各シャードは必要な列を読み込み、フィルターを適用します。
- 各シャードは部分的に合計を取りミキサーにreturnします。(各シャードが
COUNT(*)
を取る) - ミキサーはすべてのシャードの処理を待って、完了したら返ってきた全ての値の合計を取ります。
- 最後にミキサーが結果を返します。
2.2 フィルターと集計クエリ
- 2.1と同じ様にミキサーが各シャードへ並列にリクエストを送ります。
- 各シャードは列を読み込み、フィルターを適用します。
- 各シャード毎
title
ごとにカウントを取り結果を返します。(それぞれのシャードがGROUP BYする。)また次にShuffleするように指示します。 - タイトルフィールドでShuffleを行い、それぞれのシャードに展開されます。
- シャードでそれぞれの合計を取ってミキサーに返します。
- 単一のシャードで値をすべて読み取りソートを行い、結果を返します。
集計のカーディナリティが高い場合は、集計データをシャッフルして、分散ソートして値を集約していきます。(A~Cをソートするシャードや、D~Gをソートするシャードと分かれる)
Shuffleとは
Shuffleの目的は分散して並列処理させることになります。BigQueryにおけるShuffleはHash Partitioningになるそうです。
集計単位やJOINのkey単位で値にHash関数をかけて、値をバケットに分割し行き先のシャードを決定します。つまりあるシャードに対して一定範囲のkey空間が割り当てられるので、同じkeyを持つすべて値は同じシャードが受け取ります。
上のクエリの例だと、「Google」に一致するタイトルのカウントは全て同じバケットに送信されるため、「Google」バケットから値を読み取ると「Google」の合計が計算できます。「Google_Chrome」バケットから値を読み取る場合はその合計が計算できます。
単一のサーバ(ミキサー)でこれを処理しようとすると、全てのデータを一箇所に集める必要があるため現実的ではないので、分散して並列で集計させることで処理の性能を上げている。
2.3 JOINするクエリ (Broadcast JOIN)
片方のテーブルが150MB(制限は変わる)とか小さい場合には、小さい方のテーブルをHashテーブルにして各シャードに展開することでJOIN処理を行います。ここからは少し省略しながら説明します。
- repo_commitsをHashテーブルにして書き込みます。
- repo_languagesは2.2と一緒ですが、各シャードにHashテーブルが送信されます。
- 各シャードで値を集約しながらHashテーブルのチェックをしてJOIN処理をします。
- 最後にソート処理を実行して、結果を返します。
2.4 JOINするクエリ (Hash JOIN)
結合する両方のテーブルが大きいときには、それぞれのテーブルをShuffleすることで並行してJOIN処理をすることができます。
つまりJOINのキーに対してHash Paritioingをかけることで、それぞれのテーブルの値を同じシャードに送ることができます。
- repo_commitsはrepo_nameでShuffleします。
- repo_languagesはlang, author, repo_nameでShuffleします。
- JOINのkeyの値は同じシャードにいるので、各シャードでJOINしながら集計をすることができます。
- 最後にソートして結果を返します。
Execution Result
ここまでを踏まえると、BigQueryはいかに並行にマシンを動かすことで結果を得るのが良いのかなと思いました。シャッフル処理やソート処理は分散処理させるとはいえ集約させる処理が入るのでボトルネックになると思います。わかりやすい処理としては GROUP BY
ORDER BY
DISTINCT
などがあると思います。
経験上ですが大量のレコードに対して前述の処理をすると、実行結果一番右の「ディスクにオーバーフローしたバイト数」というのが高い数値として出てしまうと思います。その時はチューニングできそうだなと思っていて、たいていシャッフルやソートをさせるような処理を遅延させるとクエリの実行時間が短くなり、「シャッフルされたバイト数」や「ディスクにオーバーフローしたバイト数」の値が下がる傾向にあると感じました。
(力尽きてしまい例が出せないので、別記事で検証したいです。)
Conclusion
BigQueryの内部構造に関してツラツラ書きました。GoogleはBigQueryに限らず基本マシンをスケールアウトさせるアーキテクチャを取ることが多いようで、その思考の一端を見れたのかなと思います。
正直まだ書けてないことも多いんですが、少しでも処理のイメージが伝われば幸いです。
References
以上PTAのアドベントカレンダーでした!!!
メリー・クリスマス!!!