ChangeAnomalyDetectionパッケージを使ってHadoopにあるデータの異常検知(変化点検出)
先日作ったChangeAnomalyDetectionパッケージを使ってHadoop上のデータの異常検知する仕組みを考えてみる。
今回は、以下のようにhadoop上にデイリーで蓄積される購買履歴のようなデータを想定する
$ hadoop fs -ls /user/yokkuns/buying_history | head Found 100 items -rw-r--r-- 1 yokkuns supergroup 184 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-01 -rw-r--r-- 1 yokkuns supergroup 65 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-02 -rw-r--r-- 1 yokkuns supergroup 76 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-03 -rw-r--r-- 1 yokkuns supergroup 60 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-04 -rw-r--r-- 1 yokkuns supergroup 46 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-05 -rw-r--r-- 1 yokkuns supergroup 145 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-06 -rw-r--r-- 1 yokkuns supergroup 33 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-07 -rw-r--r-- 1 yokkuns supergroup 385 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-08 -rw-r--r-- 1 yokkuns supergroup 238 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-09
異常検知用コマンドの作成
Unixでは、標準入力/出力で扱えるとパイプとか使えて便利なので、それようのコマンドを作る
Rで標準入力を扱うには、file関数でdescriptionにstdinを指定する。また、ついでにレポートメールも出すようにした。
#!/usr/bin/env Rscript library(ChangeAnomalyDetection) library(sendmailR) library(ggplot2) main <- function(){ con <- file(description="stdin", open="r") data <- read.table(con, header=F, stringsAsFactors=F) close(con) names(data) <- c("log_name","date","x") x <- as.numeric(data$x) data$score <- changeAnomalyDetection(x, term=10, order=c(1,0,0)) reportMail(data) write.table(data[,c("log_name","date","x","score")],row.names=F,quote=F,col.names=F) } reportMail <- function(data){ if(data$score[nrow(data)] >= 2){ subject <- paste("Change in the trend: ",data$log_name[1],data$date[nrow(data)]) } else { subject <- paste("Normal:",data$log_name[1],data$date[nrow(data)]) } data$date <- as.POSIXct(data$date) score.graph <- ggplot(data, aes(x=date,y=score,group=log_name)) + geom_line(aes(col=log_name)) from <- sprintf("<changeAnomalyDetection@%s>", Sys.info()[4]) to <- "<yokkuns@tkul.jp>" body <- list(mime_part(data),mime_part(score.graph)) sendmail(from, to, subject, body) } main()
これで、change_anomaly_detection.R に標準入力でデータを渡せば、log名、日付、異常検知したい値、異常スコアが標準出力される。
hadoopコマンドでの実行
とりあえず、購買履歴のようなデータは、1レコードの大きさはほとんど変わらないと思われるので、まずはファイルサイズでトレンドに変化が無いかを見てみる。
急に大きくなっていれば、レコード数が増加を意味するので、何かが流行だしてる可能性があるとか、逆に減っている場合には、飽きられ始めてるかもとか、意外と全体の傾向が見えるので面白い。
hdfs_change_anomaly_detection.sh
hadoop fs -dusコマンドを使って、HDFS上のファイルサイズのリストを取得し、パイプでさっき作ったchange_anomaly_detection.Rに渡す。
#!/bin/bash source ~/.bashrc base_dir=$1 date_from=$2 date_to=$3 dates=`ruby -r date -e "Date.parse('${date_from}').upto(Date.parse('${date_to}')){|d|print d, ','}" | sed -e 's/,$//'` hadoop fs -dus $base_dir/{$dates} | perl -pe 's|hdfs://(.+)$base_dir/||g' | awk '{print "hdfs-size\t"$2"\t"$1}' | ./change_anomaly_detection.R
- usage
./hdfs_change_anomaly_detection.sh <base_dir> <date_from> <date_to>
- 実行例
$ ./hdfs_change_anomaly_detection.sh /user/yokkuns/buying_history 2012-01-01 2012-04-09 | tail -n 30 hdfs-size 2012-03-21 1182 0.721664957812903 hdfs-size 2012-03-22 1179 0.553532151383951 hdfs-size 2012-03-23 1118 0.476977584156091 hdfs-size 2012-03-24 1101 0.405535686607921 hdfs-size 2012-03-25 1088 0.35572429299516 hdfs-size 2012-03-26 1054 1.28066195323186 hdfs-size 2012-03-27 1335 1.79544225093878 hdfs-size 2012-03-28 1439 1.91945745582947 hdfs-size 2012-03-29 1081 2.03207005627137 hdfs-size 2012-03-30 790 1.04783664240509 hdfs-size 2012-03-31 805 0.478221556963071 hdfs-size 2012-04-01 782 0.322057929907172 hdfs-size 2012-04-02 1000 0.198707064341006 hdfs-size 2012-04-03 811 0.205308947142793 hdfs-size 2012-04-04 1033 0.161568530679506 hdfs-size 2012-04-05 973 0.0915506267593984 hdfs-size 2012-04-06 1030 0.102608803858441 hdfs-size 2012-04-07 846 0.125225930603473 hdfs-size 2012-04-08 833 0.253216213846101 hdfs-size 2012-04-09 628 0.185198971252599
hiveでの実行
HDFS上のファイルサイズでの異常検知は、全体のレコード数の傾向は見れるものの、何が増えたのかとか、などの内部の傾向の変化は捉えられない。
そこで、hiveを使ってカテゴリ別の集計を行い、その結果をchange_anomaly_detection.Rに渡すといった方法も考えられる。
- SQL作成
select category, to_date(from_unixtime(buy_time)), count(buying_id) as cnt from buying_history where to_date(from_unixtime(buy_time)) >= '$date_from' and to_date(from_unixtime(buy_time)) <= '$date_to' group by category,to_date(from_unixtime(buy_time))
- 実行
今回は、データを用意出来てないので、実行コマンドだけ。
$ hive -hiveconf date_from=2012-01-01 -hiveconf date_to=2012-04-09 -f category_buy_count.sql | ./change_anomaly_detection.R
hive + R で実行
hiveで集計した結果に、何らかの統計手法などを用いた結果に対して、異常検知を行いたいという事もあるかと思う。
その場合には、hiveで実行した結果を受けて、何らかの処理をした後標準出力するようなスクリプトを組めば、これまでと同様にパイプでつなげるだけで実行出来る
main <- function(argv){ con <- file(description="stdin", open="r") data <- read.table(con, header=F, stringsAsFactors=F) close(con) ... 何らかの処理 ... write.table(data[,c("log_name","date","y")],row.names=F,quote=F,col.names=F) }
- 実行例
こちらもデータを用意出来てないので、実行コマンドだけ。
$ hive -hiveconf date_from=2012-01-01 -hiveconf date_to=2012-04-09 -f category_buy_count.sql | ./hogehoge.R | ./change_anomaly_detection.R