本投稿はLancers(ランサーズ) Advent Calendar 2018の4日目の記事です。
昨日は、Slack の簡易エンドポイントを Amazon API Gateway で作った話についての記事でした。

本日は、ElasticCloudからAmazon Elasticsearch Serviceへの移行して良かったこと大変だったことについて話します。

概要

会社でとあるサービスのバックエンドにElasticsearchのマネージドサービスであるElasticCloudを使用していた。
ElasticCloudはElasticsearchの開発元であるElastic社が提供しているサービスで、Elasticsearchを容易に管理できる。

しかしながら、AWSでもElasticsearchをマネージドサービスで提供しており、VPCで管理できるようになった今、普段利用しているAWSに統合できた方が総合的に管理しやすくコスト管理しやすいだろうとなり、移行することとなった。
※追記あり

本ブログではどのように移行を行っていったかをまとめていく。
ここではあくまでも移行の手順のみになりElasticsearchに関するチューニング等は一切記載していない。
なお、ElasticCloudからの移行手順と記載したが、AWSの異なるAWSアカウントIDや異なるリージョン間での移行でも参考になるかもしれない。
正直かなり色々なステップがあり、長い記事になってしまったので、最初に要約を書いておく。

要約

  • ElasticCloudからAmazon Elasticsearch Serviceへは移行できる。
  • 移行する際は同じリージョンのS3を作って、そこにスナップショットを取得して、S3のクロスリージョンコピーを利用すれば別リージョンでもデータをコピーできる。
  • stream2es(Elastic社が開発)とScrollAPIを使えば、比較的簡単に大量データをindex単位でリストアできる。
  • VPC(Proxy)配下にElasticsearchを設置した場合にtd-agentがElasticsearchにつなげなくなることがあるのでマジ注意!(reload_connections falseに設定する)

構成

構成は比較的シンプルで、サイトへのアクセスがあったらnginxのログをtailしているFluentdがElasticsearchへ必要なログを保存する。
それをRailsでできたアプリケーションサーバーから管理、閲覧できるようになっている。

最初に

チームで移行しようと決まった時点で、どのような移行手順になるのか、そもそもどうやって移行するのかがわからず、まずは移行が行えるかどうかというところからのスタートだった。
ただ、調べてみるとそもそもElastic社が、自社のElasticCloudからの移行についてヘルプを記載してくれている。

Snapshot and Restore with Custom Repository

Elastic社のElasticCloudもバックエンドはAWSを利用しており、S3にスナップショットを取得することができる。
同一リージョンであれば、スナップショットの取得先を別のAWSアカウント、つまり我々の環境にデータを保存することができる。


移行手順(前編)

スナップショットのリストアまでの手順としては下記の通り。

  1. ElasticCloudはap-norheast-1は未対応なので、最初にElaticCloudのS3が設置されているリージョンと同じap-southeast-1に自分のAWSアカウントでS3バケットを作成する
  2. 上記バケットへのアクセス可能なユーザーを作成する
  3. 作成したアクセスキーでElasticCloudの本番へリポジトリを登録する
  4. リポジトリへスナップショットを取得する
  5. ap-northeast-1にバケットを作成する
  6. ap-northeast-1のバケットへのアクセス許可を最初に作成したユーザーへアタッチする
  7. S3のデータをap-northeast-1のS3バケットへクロスリージョンコピーする

実際の手順

スナップショットの保存先登録、保存、S3コピー

S3バケットの作成とIAMユーザー作成は、通常の手順なので省略する。
XXXXX-proddataというバケットを作成したとする。
作成したS3バケット、およびアクセスキー、シークレットアクセキーをElasticCloud側へ登録する。

% curl -XPUT 'https://Elastic社のリポジトリ.ap-southeast-1.aws.found.io:ポート番号/_snapshot/proddata-snapshots' -d '{
  "type": "s3",
  "settings": {
    "bucket": "XXXXX-proddata",
    "region": "ap-southeast-1",
    "access_key": "アクセスキー",
    "secret_key": "シークレットキー",
    "compress": true
  }
}'

うまくいくと、Found、つまりElasticCloudのスナップショット取得先と自社管理のS3の情報が表示される。

$ curl -XGET "https://Elastic社のリポジトリ.ap-southeast-1.aws.found.io:ポート番号/_snapshot/_all?pretty"
{
  "found-snapshots" : {
    "type" : "s3",
    "settings" : {
      "bucket" : "XXXXXXXXXXXXXXX",
      "server_side_encryption" : "true",
      "base_path" : "snapshots/XXXXXXXXXXXXXXX",
      "region" : "ap-southeast-1",
      "compress" : "true"
    }
  },
  "proddata-snapshots" : {
    "type" : "s3",
    "settings" : {
      "bucket" : "XXXXX-proddata",
      "secret_key" : "シークレットキー",
      "region" : "ap-southeast-1",
      "compress" : "true",
      "access_key" : "アクセスキー"
    }
  }
}

次にスナップショットを取得する。
最初に空であることを確認。

$ curl -XGET "https://Elastic社のリポジトリ.ap-southeast-1.aws.found.io:ポート番号/_snapshot/proddata-snapshots/_all?pretty"
{
  "snapshots" : [ ]
}

スナップショットを取得。

$ curl -X PUT 'https://Elastic社のリポジトリ.ap-southeast-1.aws.found.io:ポート番号/_snapshot/proddata-snapshots/2018061401?wait_for_completion=true'

スナップショットが取得できていることを確認。

$ curl -X GET 'https://Elastic社のリポジトリ.ap-southeast-1.aws.found.io:ポート番号/_snapshot/proddata-snapshots/_all?pretty' | egrep "state|end_time"

    "state" : "SUCCESS",
    "end_time" : "2018-06-14T07:12:05.335Z",
    "end_time_in_millis" : 1526627525335,

次に、Amazone Elasticsearch Searviceを配置予定のリージョンと同じS3バケットを作成する。※XXXXX-proddata-tokyoとする。
そして、そのS3バケットへ上記で取得したスナップショットデータをs3cmdを使ってsyncする。
なお、s3cmdのsyncでデータをコピーしたが、途中タイムアウトしてしまったので、下記を~/.s3cfg へ追記した。

socket_timeout = 300 

syncを実行。

$ s3cmd -c ~/.s3cfg-es sync s3://XXXXX-proddata/index s3://XXXXX-proddata-tokyo/

duでオブジェクト数とサイズが同じであることを確認した。(tokyoとついている方がap-northeast-1で、ついていないほうがap-southeast-1)

$ s3cmd -c .s3cfg-es du s3://XXXXX-proddata/
40225622452 40215 objects s3://XXXXX-proddata/
$ s3cmd -c .s3cfg-es du s3://XXXXX-proddata-tokyo/
40225622452 40215 objects s3://XXXXX-proddata-tokyo/

Amazon Elasticsearch Serviceへリストアする

まず、Amazon Elasticsearch Serviceを構築する。手順は通常と同じなので省略する。
最初、templateの設定をしておらずアプリケーションがうまく動作しなかった。(当たり前といえば当たり前だが、、)
templateの設定手順としては事前にcurlでGETした値を下記のようにPUTで投げてあげるだけで良い。

$ curl -XPUT "https://XXXXX.ap-northeast-1.es.amazonaws.com/_template/XXXXX-template" -d '
{
ここに値
}'

リストアの手順は下記の通り。
Amazon Elasticsearch Service インデックススナップショットの使用

まず、とあるEC2インスタンスで作業を行ったので、IAM Roleの設定を行った。

{
  "Version": "2012-10-17",
  "Statement": [{
    "Sid": "",
    "Effect": "Allow",
    "Principal": {
      "Service": "es.amazonaws.com"
    },
    "Action": "sts:AssumeRole"
  }]

ただ、上記だけでは駄目だったので、EC2も信頼関係へ追加した。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "ec2.amazonaws.com",
          "es.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

次に、ドキュメントのpythonを実行してスナップショットを認識させた。

$ curl -XGET https://XXXXX.ap-northeast-1.es.amazonaws.com/_snapshot?pretty
{
  "cs-automated" : {
    "type" : "s3"
  },
$ python script_tokyo.py
200
{"acknowledged":true}
$ curl -XGET https://XXXXX.ap-northeast-1.es.amazonaws.com/_snapshot?pretty
{
  "cs-automated" : {
    "type" : "s3"
  },
  "my-snapshot-repo-tokyo" : {
    "type" : "s3",
    "settings" : {
      "bucket" : "XXXXX-proddata-tokyo",
      "region" : "ap-northeast-1",
      "role_arn" : "arn:aws:iam::XXXXX:role/XXXXX-proddata-tokyo"
    }
  }

これで、スナップショットが認識された。
つぎにスナップショットのリストアを実行。

$ time curl -XPOST "https://XXXXXX.ap-northeast-1.es.amazonaws.com/_snapshot/my-snapshot-repo-tokyo/snapshot-2018061401/_restore" -d '
> {
>     "indices": "*",
>     "ignore_unavailable": "true",
>     "include_global_state": false
> }'

real    0m59.229s
user    0m0.044s
sys     0m0.064s

リストアコマンドはすぐに終わったが、シャードの再配置が走るので、ぶっちゃけ5時間くらいリストアにかかった。

$ curl -s -XGET "https://XXXXXX.ap-northeast-1.es.amazonaws.com/_cluster/health?pretty"
{
  "cluster_name" : "XXXXXX:es-cluster",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 2,
  "number_of_data_nodes" : 2,
  "active_primary_shards" : 1013,
  "active_shards" : 2026,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

また、最初は上記AWSのドキュメントの通りにやっても権限周りで全くうまくいかなかった。
そこで、AWSサポートに問い合わせを行ったところ、日本語ドキュメントと英語ドキュメントで手順が異なっており、英語のほうでやってほしいと言われた。
具体的にはIAMRoleで権限を付与してあげる点について、記載が日本語のほうにはなかったように記憶している。
というのも、それから数日後に日本語ドキュメントの手順も英語と同じ内容にアップデートされていたので、AWSサポートの方が対応してくれたようだった。※本問い合わせに起因したものかどうかは不明。

この時点で、リストアの時間を計測して深夜メンテナンス5時間丸々サービスを停止するのか、別の方法があるのかを模索することになった。


移行手順(後編)

stream2esとScrollAPIで差分リストア

5時間メンテナンスにするには深夜メンテナンスなどが必要そうなサービスだったので、別の方法を模索したところ、下記記事を見つけた。

Stream2esと複数データの登録

stream2esはElastic社が作成しているOSSなので、信頼性が高い。

https://github.com/elastic/stream2es

stream2esを使うとindex単位でリストアができるようだということがわかったので、単一のindexをリストアしたところうまくいった。
なお、インストール手順は下記の通り。

% curl -O download.elasticsearch.org/stream2es/stream2es; chmod +x stream2es

ただ、一個一個バックアップしてリストアするのは中々に骨が折れる。
そこでチームメンバーにも相談したところ、ElasticsearchのScrollAPIを使えば良さそうではという助言をいただいた。
ScrollAPIの使い方については下記の方の記事が参考になった。

ElasticsearchのScroll APIをためしてみた

手動でやってみて、問題なさそうであることが確認できたので、ざっくり下記のようなシェルスクリプトを作成して実行した。

#!/bin/bash

set -e

### Set Endpoint
FOUNDES="https://Elastic社のリポジトリ.ap-southeast-1.aws.found.io:ポート番号"
AWSES="https://XXXXXX.ap-northeast-1.es.amazonaws.com"

### Set scroll api params
APITIME='10m'
APISIZE='200000'

### Backup & Restore Index name
INDEXNAME='es-log-2018.07.21'

### Log
SECONDS=0
LOGFILE="/tmp/esbackup-`date +%w`.log"
exec > "${LOGFILE}"
exec 2>&1

### Start time
echo "$(date) : SCRIPT starttime"

### Check Command
echo "$(date) : Check command stream2es & jq"

which stream2es
if [ $? -ne 0 ];then echo "not found stream2es";exit 102;fi

which jq
if [ $? -ne 0 ];then echo "not found jq";exit 102;fi

### Check count
INDEXCOUNT=$(curl -s -XGET "${FOUNDES}/${INDEXNAME}/_count" | jq . | grep count | awk -F':' '{print $2}'| sed 's/,//g')
CHECKAPISIZE=$(expr ${INDEXCOUNT} / ${APISIZE})

if [ "${CHECKAPISIZE}" -ne 1 ];then
  echo "${CHECKAPISIZE}"
  echo "## Not match APISIZE.. "
fi

### Get snapshot index
echo "$(date) : Start get snapshot index"
time curl -s -XGET "${FOUNDES}/${INDEXNAME}/_search?scroll=${APITIME}&size=${APISIZE}&pretty" > /tmp/${INDEXNAME}.log.1

### Get scroll ID
SCRID=$(cat /tmp/${INDEXNAME}.log.1| grep scroll_id | awk '{print $3}' | sed 's/["|,]//g' )

### Get next snapshot index
echo "$(date) : Start get next snapshot index"
time curl -s -XGET "${FOUNDES}/_search/scroll" -d "${SCRID}" > /tmp/${INDEXNAME}.log.2

### Add all index
echo "$(date) : Modifiy index file"
cat /tmp/${INDEXNAME}.log.{1,2} | jq -c .hits.hits[]._source > /tmp/${INDEXNAME}.log.all

### Restore index
echo "$(date) : Delete index"
curl -s -XDELETE "${AWSES}/${INDEXNAME}"
echo -e "\n$(date) : Restore index\n"
time stream2es stdin --target ${AWSES}/${INDEXNAME}/es-log < /tmp/${INDEXNAME}.log.all

### Check count 2
INDEXCOUNT2=$(curl -s -XGET "${AWSES}/${INDEXNAME}/_count" | jq . | grep count | awk -F':' '{print $2}'| sed 's/,//g')

sleep 10

if [ "${INDEXCOUNT}" -eq "${INDEXCOUNT2}" ];then
  echo "OK. Match count of index!"
else
  echo "Not match count of index.. Please check count of index."
fi


### End time
echo -e "\n$(date) : SCRIPT endtime"
echo "[INFO] Script time is ${SECONDS}sec."


やっていることは、ScrollAPIを使って、指定した日付のデータをファイルとして出力する。
20万ドキュメントを2回取得している理由としては1日のドキュメント数が40万未満だったため。
その後、jqを使って必要な箇所のみを抽出して、リストアするイメージだ。

また、すべてのindexが正しく、リストアされているか確認するために下記のような簡易的なRubyスクリプトで確認した。

<br />require 'open-uri'
require 'openssl'
require 'json'
require 'date'
require 'dotenv'

# Basic auth
Dotenv.load ".env"
USER = ENV["BASICUSER"]
PASS = ENV["BASICPASS"]

(Date.parse(ARGV[0])..Date.parse(ARGV[1])).each do |date|

  checkdate = date.strftime("%Y.%m.%d")
  puts "\e[33mCheckStart : #{checkdate}\e[0m"

  # Found URL
  esuri = "https://XXXXX/index-#{checkdate}/_count"
  begin
    html = open(esuri,{:http_basic_authentication => [USER, PASS],:ssl_verify_mode => OpenSSL::SSL::VERIFY_NONE}).read
    esresult = JSON.parse(html)
    escnt = esresult["count"]
  rescue => e
    puts e
    next
  end

  # AWS URL
  awsuri = "https://XXXXX/index-#{checkdate}/_count"

  begin
    html = open(awsuri,{ :ssl_verify_mode => OpenSSL::SSL::VERIFY_NONE}).read
    awsresult = JSON.parse(html)
    awscnt = awsresult["count"]
  rescue => e
    puts e
  end

  puts "\e[36mES\e[0m    : index count #{escnt}"
  puts "\e[36mAWS\e[0m   : index count #{awscnt}"

  if escnt == awscnt
    puts "\e[32mindex count OK\e[0m\n\n"
  else
    puts "\e[31mNG... Check index-#{checkdate} count\e[0m"
    exit
  end

end

引数に西暦から日付の範囲を指定することで、index数が合わないところで停止するようにした。
なぜRubyにしたかと言えば、○○日〜○○日までチェックということをするのに、シェルスクリプトでやるには手間がかかりそうだったので、RubyのDate関数で雑に範囲指定したかった、ただそれだけなので確認方法は別になんでも良いと思っている。

上記でメンテナンス日直前まで差分リストアすれば良い。
メンテナンス当日も、シェルスクリプトでやっているように当日データの差分だけリストアすることで、サービス停止時間を5時間から10分に減らすことができた。

以上で移行手順は終わり。


障害

メンテナンスは無事終了し、つつがなく終了することができた。
が、しばらくしてサービス担当者からサービスが稼働していないことを報告された。
原因はtd-agentがエラーを吐いて、Elasticsearchに繋げなくなってしまったことだった。
エラーログは下記の通り。

018-09-06 20:07:47 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2018-09-06 20:07:48 +0900 error_class="Elasticsearch::Transport::Transport::Error" error="Cannot get new connection from pool." plugin_id="object:3fad47cb98d0"
  2018-09-06 20:07:48 +0900 [warn]: suppressed same stacktrace

対応手順についてはいくつかのサイトを参考にさせていただいたが、下記kakakakakkuさんの記事がわかりやすく説明されていた。

fluent-plugin-aws-elasticsearch-service を使う場合は reload_connections を false にする

すでに記事にかかれているように、Elasticsearch ノードに直接アクセスできないようなプロキシ配下やEIPを付与せずにVPC内だけで完結するようなネットワーク構成の場合に、ノードの更新ができずにtd-agentが接続できなくなてしまう。
厳しいのはtd-agent自体は生きているので、死活監視だと検知できないこと。
最終的には下記のオプションを入れて安定した。また、Mackerelでcheck-logを使い、ログ監視を行うことで検知できるようになった。※下記設定を入れてからは問題にはなっていない。

$ diff -u td-agent.conf.20180913 td-agent.conf
--- td-agent.conf.20180913      2018-09-06 11:38:56.460757818 +0900
+++ td-agent.conf       2018-09-14 09:50:14.345677454 +0900
@@ -24,6 +24,12 @@
     queued_chunk_flush_interval 1s
     buffer_queue_limit 1

     buffer_path /var/log/td-agent/buffer/
     buffer_type file
+    reload_connections false
+    reload_on_failure false
+    resurrect_after 60

大変だったこと

Elasticsearhに移行した直後は、特に問題なく動いていたのだが、なぜか一日程度経過してからtd-agentがElasticsearchへアクセスできない障害が発生してしまった。
ログデータをバックアップする仕組みになっていたので、数値はバックアップデータから救出することができたが、時間をかけて移行したのでそれなりにショックが大きかった。
また、お気づきの方もいるかもしれないが、移行に取り組み始めたのは今年の5月で、メンテナンスを実施したのが9月でかなりの長期戦になってしまった。
移行自体は1ヶ月ちょっとくらいで目処がついたのだが、サービス側がちょうど大きく切り替わったりする忙しい時期で、担当者の方も多忙につき全くメンテナンス日を調整できなかったため、切り替えまでに時間を要することになったのが中々に辛かった。

ちなみに言うと、ElaticCloudがそれなりに安かったことや、Amazon Elaticsearch Serviceに移行するタイミングで今後も踏まえてスペックを大きめにしたのでお値段的には少しあがってしまったのはまぁ仕方が無いと思っている。

良かったこと

やはり、AWSに移行できたので、コストの算出や、リソース管理がしやすくなった。
また、今回のナレッジを別のサービスでもきっと生かされるだろうというのがあった。
実際、社内環境ではあったが別でtd-agentを使ってElasticsearchにログを投げている環境でも同様の障害が発生し、上記対応で解消した。
個人的にはAWSの知識とElasticsearchへの知識が深まったのも大きい。

ElasticCloudからのAmazoneESへの移行は少ないかもしれないが、どこかの誰かがElasticsearchの移行を行う際の一助にでもなれば幸いである。

以上。

明日は@yKanazawaさんのCakePHPに関する記事です。


追記

あとからElastic社の中の人から、ElasticCloudの請求をAWSに一括できるという有益な情報をコメントでいただいた。
弊社では社内的な理由によりAWSのマネージドを選択したが、AWSも使っていて請求周りで二の足を踏んでいる人がいたらElastiCloudも検討してみると良さそう。