ChangeAnomalyDetectionパッケージを使ってHadoopにあるデータの異常検知(変化点検出)

先日作ったChangeAnomalyDetectionパッケージを使ってHadoop上のデータの異常検知する仕組みを考えてみる。
今回は、以下のようにhadoop上にデイリーで蓄積される購買履歴のようなデータを想定する

$ hadoop fs -ls /user/yokkuns/buying_history | head
Found 100 items
-rw-r--r--   1 yokkuns supergroup        184 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-01
-rw-r--r--   1 yokkuns supergroup         65 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-02
-rw-r--r--   1 yokkuns supergroup         76 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-03
-rw-r--r--   1 yokkuns supergroup         60 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-04
-rw-r--r--   1 yokkuns supergroup         46 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-05
-rw-r--r--   1 yokkuns supergroup        145 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-06
-rw-r--r--   1 yokkuns supergroup         33 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-07
-rw-r--r--   1 yokkuns supergroup        385 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-08
-rw-r--r--   1 yokkuns supergroup        238 2012-10-14 13:00 /user/yokkuns/buying_history/2012-01-09

異常検知用コマンドの作成

Unixでは、標準入力/出力で扱えるとパイプとか使えて便利なので、それようのコマンドを作る
Rで標準入力を扱うには、file関数でdescriptionにstdinを指定する。また、ついでにレポートメールも出すようにした。

#!/usr/bin/env Rscript

library(ChangeAnomalyDetection)
library(sendmailR)
library(ggplot2)

main <- function(){
  con <- file(description="stdin", open="r")
  data <- read.table(con, header=F, stringsAsFactors=F)
  close(con)
  names(data) <- c("log_name","date","x")
  x <- as.numeric(data$x)
  data$score <- changeAnomalyDetection(x, term=10, order=c(1,0,0))

  reportMail(data)
  write.table(data[,c("log_name","date","x","score")],row.names=F,quote=F,col.names=F)
}

reportMail <- function(data){
  if(data$score[nrow(data)] >= 2){
    subject <- paste("Change in the trend: ",data$log_name[1],data$date[nrow(data)])
  } else {
    subject <- paste("Normal:",data$log_name[1],data$date[nrow(data)])
  }

  data$date <- as.POSIXct(data$date)
  score.graph <- ggplot(data, aes(x=date,y=score,group=log_name)) +
      geom_line(aes(col=log_name))
  from <- sprintf("<changeAnomalyDetection@%s>", Sys.info()[4])
  to <- "<yokkuns@tkul.jp>"
  body <- list(mime_part(data),mime_part(score.graph))
  sendmail(from, to, subject, body)
}

main()

これで、change_anomaly_detection.R に標準入力でデータを渡せば、log名、日付、異常検知したい値、異常スコアが標準出力される。

hadoopコマンドでの実行

とりあえず、購買履歴のようなデータは、1レコードの大きさはほとんど変わらないと思われるので、まずはファイルサイズでトレンドに変化が無いかを見てみる。
急に大きくなっていれば、レコード数が増加を意味するので、何かが流行だしてる可能性があるとか、逆に減っている場合には、飽きられ始めてるかもとか、意外と全体の傾向が見えるので面白い。

hdfs_change_anomaly_detection.sh

hadoop fs -dusコマンドを使って、HDFS上のファイルサイズのリストを取得し、パイプでさっき作ったchange_anomaly_detection.Rに渡す。

#!/bin/bash

source ~/.bashrc

base_dir=$1
date_from=$2
date_to=$3
dates=`ruby -r date -e "Date.parse('${date_from}').upto(Date.parse('${date_to}')){|d|print d, ','}" | sed -e 's/,$//'`

hadoop fs -dus $base_dir/{$dates} | perl -pe 's|hdfs://(.+)$base_dir/||g' | awk '{print "hdfs-size\t"$2"\t"$1}' | ./change_anomaly_detection.R
  • usage
./hdfs_change_anomaly_detection.sh <base_dir> <date_from> <date_to>
  • 実行例
$ ./hdfs_change_anomaly_detection.sh /user/yokkuns/buying_history 2012-01-01 2012-04-09 | tail -n 30

hdfs-size 2012-03-21 1182 0.721664957812903
hdfs-size 2012-03-22 1179 0.553532151383951
hdfs-size 2012-03-23 1118 0.476977584156091
hdfs-size 2012-03-24 1101 0.405535686607921
hdfs-size 2012-03-25 1088 0.35572429299516
hdfs-size 2012-03-26 1054 1.28066195323186
hdfs-size 2012-03-27 1335 1.79544225093878
hdfs-size 2012-03-28 1439 1.91945745582947
hdfs-size 2012-03-29 1081 2.03207005627137
hdfs-size 2012-03-30 790 1.04783664240509
hdfs-size 2012-03-31 805 0.478221556963071
hdfs-size 2012-04-01 782 0.322057929907172
hdfs-size 2012-04-02 1000 0.198707064341006
hdfs-size 2012-04-03 811 0.205308947142793
hdfs-size 2012-04-04 1033 0.161568530679506
hdfs-size 2012-04-05 973 0.0915506267593984
hdfs-size 2012-04-06 1030 0.102608803858441
hdfs-size 2012-04-07 846 0.125225930603473
hdfs-size 2012-04-08 833 0.253216213846101
hdfs-size 2012-04-09 628 0.185198971252599

hiveでの実行

HDFS上のファイルサイズでの異常検知は、全体のレコード数の傾向は見れるものの、何が増えたのかとか、などの内部の傾向の変化は捉えられない。
そこで、hiveを使ってカテゴリ別の集計を行い、その結果をchange_anomaly_detection.Rに渡すといった方法も考えられる。

select
       category,
       to_date(from_unixtime(buy_time)),
       count(buying_id) as cnt
from
       buying_history
where
       to_date(from_unixtime(buy_time)) >= '$date_from'
       and to_date(from_unixtime(buy_time)) <= '$date_to'
group by
       category,to_date(from_unixtime(buy_time))
  • 実行

今回は、データを用意出来てないので、実行コマンドだけ。

$ hive -hiveconf date_from=2012-01-01 -hiveconf date_to=2012-04-09 -f category_buy_count.sql | ./change_anomaly_detection.R

hive + R で実行

hiveで集計した結果に、何らかの統計手法などを用いた結果に対して、異常検知を行いたいという事もあるかと思う。
その場合には、hiveで実行した結果を受けて、何らかの処理をした後標準出力するようなスクリプトを組めば、これまでと同様にパイプでつなげるだけで実行出来る

main <- function(argv){
  con <- file(description="stdin", open="r")
  data <- read.table(con, header=F, stringsAsFactors=F)
  close(con)

  ... 何らかの処理 ...

  write.table(data[,c("log_name","date","y")],row.names=F,quote=F,col.names=F)
}
  • 実行例

こちらもデータを用意出来てないので、実行コマンドだけ。

$ hive -hiveconf date_from=2012-01-01 -hiveconf date_to=2012-04-09 -f category_buy_count.sql | ./hogehoge.R | ./change_anomaly_detection.R