Aho-Corasick法による複数文字列(パターン)検索を試す

はじめに

Rabin-Karp法による複数文字列検索に続いて、同様に複数の文字列検索を行えるAC法を試しに書いてみた。

AhoCorasick法

  • えいほこらしっくほう
  • 文字列探索するときに、パターンマッチオートマトン(PMA)を使い、状態を遷移させながらO(n)でパターンマッチを行う方法
    • 入力文字列を一文字ずつ読み込みながらPMAの状態を遷移
    • PMAは与えられるパターンを表現する
      • ノードは状態、辺は対応する文字、を表す
  • PMA構築アルゴリズムは、
    • パターン文字列のTrieを作成
    • 根から幅優先探索で各ノードで遷移が失敗した場合の遷移先を決定
    • そこへ辺を張る、を繰り返す
失敗時の遷移先の決定
  • trieと違って、葉ノードまでたどりきったら終了ではなく、失敗したときに遷移するノードを決めておくことで、連続して探索を行える


(図1)

  • パターン文字列「ab」と「bcd」に対して、入力「abcde」を考える
    • 入力の最初から「a」「b」と読み込み、trieでは、葉ノードにたどり着く(遷移できない)
    • このとき、入力の2番目の「b」から再びtrieをたどることもよいが、すでに「b」は出てきている
    • したがって、trieの葉ノード(図1の右上のノード)の「b」で遷移失敗した場合は、根から「b」で遷移できるノード(図1の真ん中下のノード)に遷移すれば、やり直しをスキップできる
  • このように、trieに失敗時の遷移辺を追加することで繰り返しをなくせる
    • これが、読み込んだ入力までの接尾辞でパターンマッチする最長のもの=「根から最長のもののノード」への辺になっている


(図2)

  • この失敗時の遷移先を決める方法は、根からの幅優先探索で決めていけばよいというのが、この手法
  • あるノード(図2の赤ノード)にいる場合、
    • ある文字cで遷移できるノード(図2の緑ノード)の失敗時遷移辺を決める
      • 赤ノードはすでにfailure辺を適切に処理されているとする
    • 赤ノードで失敗した場合の遷移先ノードで同じ文字cで遷移できるならば、そこへ移動すれば遷移を続けられるので、そこへ緑ノードからfailure辺を張る
      • それでも同じ文字cで遷移できる辺がなければそのノードの失敗した場合の遷移先ノードで、これを繰り返す
      • すべて失敗した場合は、そこまででパターンマッチしたものがないということなので、根に戻る
    • これを帰納的に繰り返すことで、すべてのノードにfailure辺を張れる
      • 根で失敗したら、根へ自己ループ辺を張る
      • それ以外遷移できない場合は基本的に根へ辺を張る

コード

ということで、試してみる。
map使っているので、遅め。
2016/2/28追記:コメントで指摘していただいた通り、効率が悪いコードなので、スタックが足りなくなってセグフォる場合は、「AhoCorasick ac = new AhoCorasick(pat);」にしてみたり、メモリが足りなくなる場合は、add_failure_edge()の「//マッチするパターンを追加」の部分を修正するとよいかもしれません。

#include <iostream>
#include <map>
#include <string>
#include <vector>
#include <queue>

class AhoCorasick {

  struct Node {
    std::map<char,Node> child; //遷移辺(!)
    std::vector<size_t> pattern; //マッチするパターン(のindex)
    Node* failure; //遷移失敗時の遷移先ノード
    Node():failure(NULL){}
  };
  Node root; //根

  //PMAのクリア
  void clear_graph(){
    root.child.clear();
    root.pattern.clear();
  }

  //Trieを構築
  void generate_trie(const std::vector<std::string>& patterns){
    size_t n = patterns.size();

    for(size_t i=0; i<n; i++){
      Node* t = &root;
      for(size_t j=0; j<patterns[i].length(); j++){
        t = &(t->child[patterns[i][j]]);
      }
      t->pattern.push_back(i);
    }
  }

  //failure辺を追加
  void add_failure_edge(){
    std::queue<Node*> que;
    que.push(&root);

    //幅優先探索で帰納的に失敗時の遷移辺を追加していく
    while(!que.empty()){
      Node* t = que.front(); que.pop();
      std::map<char,Node>::iterator itr = t->child.begin();

      for(; itr != t->child.end(); ++itr){
        que.push(&(itr->second));
        char c = itr->first;
        Node* node = &(itr->second); //文字cで遷移する頂点

        Node* anode = t->failure; //失敗したときの遷移先
        while(anode != NULL && anode->child.count(c)==0) anode = anode->failure;

        //遷移失敗時に続けられる別の頂点へ遷移
        if(anode == NULL){
          node->failure = &root;
        }else{
          node->failure = &(anode->child[c]);
        }

        //マッチするパターンを追加
        for(size_t i=0; i<node->failure->pattern.size(); i++){
          node->pattern.push_back(node->failure->pattern[i]);
        }
        //メモリ食いすぎ回避のため切り詰めておく
        std::vector<size_t>(node->pattern).swap(node->pattern);
        /* 2016/2/28追記:
          上記のように1つずつpush_back()していくと、メモリを無駄に使ってしまうかもしれません。

          試しに、パターンpat={a,aa,aaa,aaaa,aaaaa,...,(aが10000個)}のような入力を想定すると、
          1つずつpush_back()だと、node->patternのcapacity()は16384のものが並んでしまいますが、
          速度よりもメモリが問題になる場合は、速度を犠牲に切り詰めて保持するとよさそうです。(#include <algorithm>が必要)

          //swap技法とか
          std::vector<size_t>(node->pattern).swap(node->pattern);

          //set_unionするとか
          std::vector<size_t> buf;
          std::sort(node->pattern.begin(), node->pattern.end()); //いらないかも
          std::sort(node->failure->pattern.begin(), node->failure->pattern.end()); //いらないかも
          std::set_union(node->pattern.begin(), node->pattern.end(),
                         node->failure->pattern.begin(), node->failure->pattern.end(),
                         std::back_inserter(buf));
          node->pattern = buf;

          //copyとか
          std::vector<size_t> buf;
          std::copy(node->pattern.begin(), node->pattern.end(), std::back_inserter(buf));
          std::copy(node->failure->pattern.begin(), node->failure->pattern.end(), std::back_inserter(buf));
          node->pattern = buf;

          //そもそもvectorじゃなくてlistなどを使った方がよいか

         //capacity()で確保メモリサイズを確認
         std::cout << node->pattern.capacity() << std::endl;
        */
      }
    }
  }

  //Pattern Match Automatonを構築
  void make_PMA(const std::vector<std::string>& patterns){
    clear_graph();
    //trieを構築
    generate_trie(patterns);
    //failure辺を追加
    add_failure_edge();
  }
  
public:
  AhoCorasick(const std::vector<std::string>& patterns){
    make_PMA(patterns);
  }

  //文字列の探索
  std::vector< std::pair<size_t,size_t> > find(const std::string& text){
    std::vector< std::pair<size_t,size_t> > ret;
    Node* node = &root;
    
    //PMAにtextを入力
    for(size_t i=0; i<text.length(); i++){

      //遷移失敗するかどうか確認
      while(node != NULL && node->child.count(text[i])==0) node = node->failure;

      //次の頂点へ移動
      if(node == NULL) node = &root;
      else node = &(node->child[text[i]]);

      //もしパターンが見つかったら保存
      for(size_t j=0; j<node->pattern.size(); j++){
        size_t e = i; //見つかった位置(パターンの文字列の最後の部分)
        size_t pi = node->pattern[j]; //見つかったパターンのindex
        ret.push_back(std::make_pair(e,pi));
      }
    }
    return ret;
  }
};

int main(){

  //探したいパターン
  std::vector<std::string> pat;
  pat.push_back("ab");
  pat.push_back("bcd");
  
  AhoCorasick ac(pat);

  //入力文
  std::string text = "abcde";

  //テキストの探索
  std::vector< std::pair<size_t,size_t> > ret = ac.find(text);

  //結果の出力
  for(size_t i=0; i<ret.size(); i++){
    size_t e = ret[i].first; //マッチパターンの文字列最後の位置
    size_t pi = ret[i].second; //マッチしたパターンのindex
    size_t s = e + 1 - pat[pi].length(); //マッチパターンの文字列最初の位置

    std::cout << "[" << pat[pi] << "] find at index " << s << std::endl;
  }

  return 0;
}

実行結果

[ab] find at index 0
[bcde] find at index 1

見つかった。