<?php

	/*   
	 Copyright (c) 2012, Paul G Talaga
	 All rights reserved.
	 
	 Redistribution and use in source and binary forms, with or without
	 modification, are permitted provided that the following conditions are met:
	 * Redistributions of source code must retain the above copyright
	 notice, this list of conditions and the following disclaimer.
	 * Redistributions in binary form must reproduce the above copyright
	 notice, this list of conditions and the following disclaimer in the
	 documentation and/or other materials provided with the distribution.
	 
	 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
	 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
	 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
	 DISCLAIMED. IN NO EVENT SHALL Paul G Talaga BE LIABLE FOR ANY
	 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
	 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
     LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
	 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
	 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
	 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
	 */
    
	/*
	 tentacle.php - PHP Class implementing a MySQL middleware client, forwarding agent
		and caching system.
	 
	 Developed to enable a non-distributed web application to be distributed, it relys
	 on all database query calls also providing a consistency time value.  This value
	 represents how stale the data can be if a read request, or how much time can be
	 allowed to elapse before the write query is applied.
	 
	 Using the consistency time value, tentacle can direct read queries to a local
	 database or query cache.  If some page is known to contain low consistency values,
	 not satisfyable locally, the entire HTTP query is forwarded with session information
	 to the central webserving/database location where it is handled.  In this manner all
	 HTTP queries are garuneed to travel at most one round trip to the central server.
	 
	 Configurables in this class during initialization:
	 1.  Central HTTP IP
	 2.  Local MySQL database IP
	 3.  Central MySQL database IP
	 4.  Database name
	 5.  Databaes username
	 6.  Database password
	 7.  Persistent DB connection
	 8.  Local Memcache server IP
	 9.  Local temp file prefix (for queing write queries)
	 10. Local executable to flush write queries
	 
	 Notes: APC is required for fast local storage.
			See below for required constants.
	 
	 */

define('TENT_CACHE_READS',true);  // Cache read queries?
define('TENT_QUEUE_WRITES',true); // Queue writes if time > 0?
define('TENT_PROXY_ALL',false);   // Should all HTTP requests be forwarded?
define('DB_QUERY_SAVE_FILE','/tmp/db.txt');
define('FORWARD_LIST_FILE','/tmp/forward.txt');

class Tentacle{
  private $init = false;
  // IP or Domain to send important requests to
  private $central_http = null;
  // DB info
  private $localDB = null;
  private $localDBhost = null;
  public $centralDB = null;
  private $centralDB_host = null;
  private $db_user = null;
  private $db_password = null;
  private $db_name = null;
  private $use_pconnect = null;
  // Memcache 
  public $memcache = null;
  private $memcache_servers = null;
  // Local exe info
  private $exe_location = null;
  private $local_file = null;
  // Forwarding list
  private $forwarding_list = null;
	// Can prewarm with an array of strings on line 121
  private $central = false;
  // Debug
  private $query_count = 0;
  
  function __construct($central_http, $local_db, $central_db, $db_name, $db_user, $db_password, $db_pconnect, $memcache_servers, $local_temp_file_prefix, $exe){
    // We require APC for fast local storage of state
    if($this->init)return; // bail if already initialized
    if(!extension_loaded('apc'))die('APC Module required for Tentacle, not found');
    $this->db_user = $db_user;
    $this->db_password = $db_password;
    $this->db_name = $db_name;
    $this->central_http = $centralDB;
    $this->centralDB_host = $central_db;
    if($central_http == $_SERVER['SERVER_ADDR']){
      $this->localDBhost = $central_db;
    }else{
      $this->localDBhost = $local_db;
    }
    $this->use_pconnect = $db_pconnect;
    $this->local_file = $local_temp_file_prefix;
    $this->exe_location = $exe;
    $this->central_http = $central_http;
    // If we're the central server don't worry about forwarding list
    if($central_http == $_SERVER['SERVER_ADDR']){
      $this->central = true;
    }else{
      // get forwarding list from apc
      if(apc_exists('forwarding_list')){
        $this->forwarding_list = apc_fetch('forwarding_list');
      }else if(file_exists(FORWARD_LIST_FILE)){// We really should pull the list from a memcache, but this will be easier
        $file = fopen(FORWARD_LIST_FILE,'r');	// NOT TESTED
        $contents = fread($file,filesize(FORWARD_LIST_FILE));
        fclose($file);
        $this->forwarding_list = unserialize($contents);
        apc_store('forwarding_list',$this->forwarding_list);
      }else{// nothing stored, start from scratch  
        $this->forwarding_list = array(); // will be hash map
        apc_store('forwarding_list',$this->forwarding_list);
      }
    }
	// connect memcache
    $return = true;
	$this->memcache_servers = $memcache_servers;
	$this->connectMemcache($this->memcache_servers);
    $this->init = true;
  }
  
  function __destruct(){
    // Close connections to DB's TODO
    //$this->close();
  }
  

  
  function doForward(){
	// This function should be called in the application before ANY response is sent to the client.
	// Otherwise custom HTTP headers can't be sent!
	  
    // if in forward list, forward request to central
    if($this->central){return;} // don't forward to yourself!
	// do forward?
    // to speed matching we take the requested URI and see if 
	// one of the stored entries in the forwarding lists is a substring
	// if so, we forward.
    $uri = $_SERVER['REQUEST_URI'];
    $trip = false;
    foreach($this->forwarding_list as $find){
        if(strpos($uri,$find) !== FALSE)$trip = true; 
    }
    if($trip == false && !TENT_PROXY_ALL)return;
    // get session & headers
    $headers = apache_request_headers();
    $sessname = tep_session_name();
    $sessid = tep_session_id();
    // get the session info from behind PHP's back!
    $sessData = $this->memcache->get(tep_session_id());
    
    // what kind of request was it?
    $rtype = $_SERVER['REQUEST_METHOD'];
    $postvars = $_POST;
	// package up the request including headers, type, and session info
    $tosend = array($uri,$headers,$rtype,$postvars,array(array('key' => tep_session_id(),'value' => $sessData)));
    $tosend = serialize($tosend);
    $tosend = gzcompress($tosend);
	// Forward packaged request to central /remoteaccess.php
	// Eventually security should be added!
    $response = $this->url_request('POST','http://' . $this->central_http . '/remoteaccess.php',array('r' => $tosend));
    $response = $response['content'];
	// Unpackage response
    list($response,$responseheaders, $newsession) = unserialize(gzuncompress($response));
    // Update session state
    foreach ($_SESSION as $key => $value){
        unset($_SESSION[$key]);
    }
    foreach($newsession as $s){
              session_decode($s['value']);
    }
    // Set reponse headers
    $h = explode("\n",$responseheaders);
    foreach($h as $i){
      if(strpos($i,'chunked') === FALSE)
        header($i);
    }
    // Send the response!
    echo $response;
    tep_exit();
  }
  
  function getLocalResource(){
    if($this->localDB != null)return $this->localDB;
    if($this->centralDB != null)return $this->centralDB;
    $this->connectLocalDB();
    return $this->localDB;
  }
  
  function query($delay_time, $query){
	// All MySQL queries in the application should call this instead, with a
	// delay time (in seconds) set.
    $pageId = $_SERVER['REQUEST_URI'];
    $this->printQuery($delay_time, $query); // Logs query if needed
    if($delay_time <= 0 && !$this->central){ // need immediate, go to central!
      // TODO: need to dynamically change the above  based on replication lag
          $this->forwarding_list[] = $pageId; 
          $this->updateForwardingList();
    }
    // Decide if read or write
    if(strpos($query,'select') === FALSE){ // write
      if(TENT_QUEUE_WRITES && !$this->central){
        $handle = @fopen('/tmp/toUse.txt', "r");  // the write query log is rotated
        $myi = rtrim(fgets($handle, 4096));
        fclose($handle);
        $handle = @fopen('/tmp/queries' . $myi . '.txt', "a");
        fwrite($handle,serialize(array('now' => time(), 'query' => $query)) . "\n");
        fclose($handle);
        $result = 1;
      }else{
        if($this->centralDB == null)$this->connectCentralDB();
        $result = mysql_query($query, $this->centralDB);
      }
    }else{// read
      if(TENT_CACHE_READS && $delay_time > 1){
        $result = $this->getReadQueryWCache($delay_time,$query);
      }else{
        if($this->localDB == null)$this->connectLocalDB();
        $result = mysql_query($query, $this->localDB);
      }
    }
    
    return $result;
  }
  
  function getReadQueryWCache($delay_time,$query){
    // To mitigate the thundering herd problem, we embed the expire time
    // in the cached data so we know how much time is left and refresh the data
    // During the last  minute we probabilistically fail (1/(sec time left + 1))
    // so heavily used keys will get refreshed at most a second early, but hopefully 
    // by only by 1 client.  OR, on a refresh fail, cause the background DB write daemon
    // to do the call TODO
    global $db_pointer;
    global $db_cache;
    global $cache_debug;
    $result = $this->memcache->get($this->hashMe($query));
    if($result === FALSE){
      if($this->localDB == null)$this->connectLocalDB();
      $r = mysql_query($query, $this->localDB);
      $result_array = $this->getArrayfromResource($r);
      $this->memcache->set($this->hashMe($query),array($delay_time + time(),$result_array),0,$delay_time);
      $cache_debug['miss']++;
      return $r;
    }
    $cache_debug['hit']++;
    // found!
    $expire_time = $result[0];
    $returned = $result[1];
    //var_dump($returned);
    //echo "<br><br>\n";
    $left = $expire_time - time();
    if($left < 60 && 0){  // Disabled pending verification
      // preempt update!
      if($left < 0 || (1000 / ($left + 1) > rand(1,1000))){
        $r = mysql_query($query, $this->localDB);
        $result_array = $this->getArrayfromResource($r);
        $this->memcache->set('q'.md5($query),serialize(array($query,$delay_time + time(),$result_array)),0,$delay_time);
        return $r;
      }
    }
    $db_pointer = 0;
    $db_cache = $returned;
    return array('pointer' => 0, 'rows' => 0);
    
  }
   
  function hashMe($pre){
    return md5($pre).md5($pre . $pre);
  }
  
  function getArrayfromResource($r){
    $ret = array();
    $i = 0;
    while($row = mysql_fetch_array($r,MYSQL_ASSOC)){
      $ret[$i] = $row;
      $i++;
    }
    if($i > 0)mysql_data_seek($r,0);
    return $ret;
  }
  
  function connectCentralDB(){
    // we don't automatically connect to the central database because usually we don't have to, ever
    // Once running all 0 timed queries are eliminated and the HTTP request is sent to central anyway
    if ($this->use_pconnect == 'true') {
      $this->centralDB = mysql_pconnect($this->centralDB_host, $this->db_user, $this->db_password);
    } else {
      $this->centralDB = mysql_connect($this->centralDB_host, $this->db_user, $this->db_password);
    }
    if(!$this->centralDB){die("Can't connect to (central) ". $this->centralDB_host ."!\n");}
    mysql_select_db($this->db_name,$this->centralDB);
  }
  
    function connectLocalDB(){
    // we don't automatically connect to the central database because usually we don't have to, ever
    // Once running all 0 timed queries are eliminated and the HTTP request is sent to central anyway
    if ($this->use_pconnect == 'true') {
      $this->localDB = mysql_pconnect($this->localDBhost, $this->db_user, $this->db_password);
    } else {
      $this->localDB = mysql_connect($this->localDBhost, $this->db_user, $this->db_password);
    }
    if(!$this->localDB){die("Can't connect to (local) ". $this->localDBhost ."!\n");}
    mysql_select_db($this->db_name,$this->localDB);
  }
  
  function connectMemcache($memcache_servers){
    $this->memcache = new MemcachePool();
    if(is_array($memcache_servers)){
      foreach($memcache_servers as $s){
        list($l1,$l2,$l3,$l4,$l5,$l6) = $s;
        $this->memcache->addServer($l1,$l2,$l3,$l4,$l5,$l6);
      }
    }else{
      list($l1,$l2,$l3,$l4,$l5,$l6) = $memcache_servers;
      $this->memcache->addServer($l1,$l2,$l3,$l4,$l5,$l6);
    }
  }
  
  function updateForwardingList(){
    apc_store('forwarding_list',$this->forwarding_list);
    // todo: convert to memcache
    $content = serialize($this->forwarding_list);
    flock($f, LOCK_EX);
    $file = fopen(FORWARD_LIST_FILE,'w');
    fwrite($file,$content);
    fclose($file);
    flock($f, LOCK_UN);
  }
  // Debug functions
  
  function printQuery($time, $query){
    if($time > 20)return;
    $f = fopen(DB_QUERY_SAVE_FILE,'a');
    if($f == FALSE){die('file open error DB_QUERY_SAVE_FILE');};
    $query = $query;
    $dSmall = substr($query,0,40);
    $output = $time . ':'  . ' '. $_SERVER['REQUEST_URI'] . '  -  ' . $dSmall . "\n";
    flock($f, LOCK_EX);
    fwrite($f, $output);
    flock($f, LOCK_UN);
    fclose($f);
  }
  function url_request($type,$url, $data, $headers = '') {
      // Convert the data array into URL Parameters like a=b&foo=bar etc.
      $data = http_build_query($data);
      // parse the given URL
      $url = parse_url($url);
      if ($url['scheme'] != 'http') { 
          die('Error: Only HTTP request are supported !');
      }
      // extract host and path:
      $host = $url['host'];
      $path = $url['path'] ;
      // open a socket connection on port 80 - timeout: 30 sec
      $fp = fsockopen($host, 80, $errno, $errstr, 30);
      if ($fp){
   
          // send the request headers:
          if($type == 'POST'){
            fputs($fp, "POST $path HTTP/1.0\r\n");
          }else{
            fputs($fp, "GET $path HTTP/1.0\r\n");
          }
          fputs($fp, "Host: $host\r\n");
   
          if ($headers != ''){
              foreach($headers as $k => $d){
                if($k != 'Content-Length'){
                fputs($fp, "$k: $d\r\n");}
              }
          }
          if($type == 'POST'){
            fputs($fp, "Content-type: application/x-www-form-urlencoded\r\n");
            fputs($fp, "Content-length: ". strlen($data) ."\r\n");
            fputs($fp, "Connection: close\r\n\r\n");
            fputs($fp, $data);
	    }
          $result = ''; 
          while(!feof($fp)) {
              // receive the results of the request
              $result .= fgets($fp, 1024); // 128
          }
      }
      else {
	fclose($fp);
          return array(
              'status' => 'err', 
              'error' => "$errstr ($errno)"
          );
      }
   
      // close the socket connection:
      fclose($fp);
   
      // split the result header from the content
      $result = explode("\r\n\r\n", $result, 2);
   
      $header = isset($result[0]) ? $result[0] : '';
      $content = isset($result[1]) ? $result[1] : '';
   
      // return as structured array:
      return array(
          'status' => 'ok',
          'header' => $header,
          'content' => $content
      );
  }
}
?>
