You are not logged in.

  • Login

Monday, April 11th 2011, 8:39pm

Tags

Apache, Facebook, flume, PHP, Thrift

Abstract

Hier wird das Setup eines Thrift RPC Servers auf Basis von PHP erläutert. Der Server wird den Austausch von strukturierten und unstrukturierten Daten erlauben.

Article

1. Einrichtung


Wir gehen davon aus, dass Apache Thrift als Voraussetzung installiert ist. Sollte es noch nicht installiert sein, finden sich hier Anleitungen zur Einrichtung von Thrift und der PHP Extension.

Wir verwenden die aktuellen Quellen aus dem Thrift Trunk, falls einige Klassen noch bei euch fehlen, dann gleicht diese bitte mit dem Beispiel Download am Ende dieser Anleitung ab.

2. Service Definition


Zu Beginn brauchen müssen wir einen Thrift Service definieren. Wir werden in diesem Beispiel die Thrift Definition von Flume verwenden.
Der Flume Client ist ein normaler Thrift Client mit, dessen Schema im Thrift Format beschrieben wurde.
Ihr könnt also auch jeden anderen Thrift Service implementieren.

Source code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct ThriftFlumeEvent {
  1: Timestamp timestamp,
  2: Priority priority,
  3: binary body,
  4: i64 nanos,
  5: string host,
  6: map<string,binary> fields
}

service ThriftFlumeEventServer {
  oneway void append( 1:ThriftFlumeEvent evt ),

  void close(), 
}


3. Code generieren


Mit dem Thrift Compiler erzeugen wir nun Client- und Serverdateien und verschieben diese anschließend in das packages Verzeichnis.

Bash

1
2
thrift --gen php:server flume.thrift
mv gen-php/ thrift/packages


4. Implementierung


Nun kommen wir zur Implementierung des Servercodes. Wir haben uns im Beispiel für den TForkingServer Server entschieden.
Ansonsten müssen wir nur das Interface implementieren. Wir werden den Server als Beispiel in der Konsole laufen lassen und die Ausgaben direkt mit Ausgaben monitorn können.

Wir haben den Server so implementiert, dass die angegebene Kategorie im Feld genutzt wird um die Methode zu bestimmen.

PHP Quellcode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<?php
$GLOBALS['THRIFT_ROOT'] = './thrift';
 
require_once $GLOBALS['THRIFT_ROOT'] . '/Thrift.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/transport/TServerSocket.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/transport/TTransportFactory.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/protocol/TBinaryProtocol.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/server/TForkingServer.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/packages/flume/flume_types.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/packages/flume/ThriftFlumeEventServer.php';
 
/**
 * thrift server to handle flume events
 * @author	Torben Brodt <www.easy-coding.de>
 */
class FlumeHandler implements ThriftFlumeEventServerIf {
 
	/**
	 * public api call
	 * @implements ThriftFlumeEventServerIf
	 */
	public function append($evt) {
		$cat = $evt->fields['cat'];
 
		if(method_exists($this, $cat)) {
			$this->$cat($evt);
		} else {
			echo 'unknown method: '.$cat;
		}
	}
 
	/**
	 * public api call
	 * @implements ThriftFlumeEventServerIf
	 */
	public function close() {
		echo "close";
	}
 
	/**
	 * for debug purpose
	 */
	protected function ping($evt) {
		print_r($evt);
	}
}
 
$handler = new FlumeHandler();
$processor = new ThriftFlumeEventServerProcessor($handler);
 
// will run server on port 9090
$transport = new TServerSocket();
$outputTransportFactory = $inputTransportFactory = new TTransportFactory($transport);
$outputProtocolFactory = $inputProtocolFactory = new TBinaryProtocolFactory();
 
$server = new TForkingServer(
	$processor,
	$transport,
	$inputTransportFactory,
	$outputTransportFactory,
	$inputProtocolFactory,
	$outputProtocolFactory
);
 
header('Content-Type: application/x-thrift');
$server->serve();


5. Beispiel: Thrift zu Thrift


Wir starten den Server nun mit

Bash

1
php -f server.php


Den Sender aus dem Tutorial "Flume mit PHP" lassen wir direkt auf unser Thrift Service zeigen.
Dazu instanziieren wir den Socket mit Port 9090 und senden "ping" als Kategorie.

PHP Quellcode

1
2
3
4
5
6
7
$transport = new TSocket($host = 'localhost', $port = '9091');
...
$client->append(...
	'fields' => array(
		'cat' => 'ping'
	)
);


Sobald wir nun den Sender mit php -f sender.php aufrufen können wir in der Ausgabe des Servers sehen, dass der Event ankommt.

6. Beispiel: Thrift über Flume zu Thrift


Wir werden die Ausgaben nun in einer lokalen Anwendung puffern und darüber zu unserem Thrift Service übertragen.
Dazu benötigen wir ein Flume Setup wie es hier erläutert wird: Cloudera Flume Installation

Wir werden den Node über den Flume Master (http://localhost:35871/) wie folgt konfigurieren. Als Flume Sink verwenden wir den Server time.easy-coding.de wo der Flume Service läuft.
* rpcSource(9091)
* rpcSink("time.easy-coding.de", 9090)

Wir binden den Flume Node damit an einen lokalen Port 9091, der die Anfragen dann wiederum an den Remote Server weiterleitet.

7. Beispiel: Strukturierte Daten von Thrift über Flume zu Thrift


Um strukturierte Daten auszutauschen erstellen wir uns ein eigenes Thrift Schema "vector.thrift".

Source code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
namespace java easycoding.thrift
typedef i64 id

enum VectorTarget {
  THREAD = 0,
  WIKI = 1,
  BOARD = 2,
  BLOG = 3
}

enum Vector {
  BROWSER = 0,
  OS = 1,
  GEO_REGION = 2
}

struct VectorSequence {
  1: map<VectorTarget, id> targets,
  2: map<Vector, string> env,
  3: string query
}


Wir erzeugen die PHP Dateien und kopieren die Klassen in den packages Ordner

Bash

1
2
thrift --gen php vector.thrift
mv gen-php/ thrift/packages


Nun können wir mit folgendem Code das Event übertragen:

PHP Quellcode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<?php
$GLOBALS['THRIFT_ROOT'] = './thrift';
 
require_once $GLOBALS['THRIFT_ROOT'] . '/Thrift.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/protocol/TBinaryProtocol.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/packages/flume/ThriftFlumeEventServer.php';
 
$transport = new TSocket($host = 'localhost', $port = '9091');
$protocol = new TBinaryProtocolAccelerated($transport);
$client = new ThriftFlumeEventServerClient($protocol, $protocol);
 
$transport->open();
$timestamp = intval(microtime(true) * 1000);
 
// log structed message
require_once $GLOBALS['THRIFT_ROOT'] . '/protocol/TBinarySerializer.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/packages/vector/vector_types.php';
$vector = new VectorSequence();
$vector->query = 'thrift server example';
$vector->targets = array(
	VectorTarget::THREAD => 100,
	VectorTarget::BOARD => 10
);
$vector->env = array(
	Vector::BROWSER => 'Firefox 3.5',
	Vector::OS => 'Ubuntu 10.10',
	Vector::GEO_REGION => 'de-16', // for berlin
);
 
$body = TBinarySerializer::serialize($vector);
$client->append(new ThriftFlumeEvent(array(
	'priority' => Priority::INFO,
	'timestamp' => $timestamp,
	'body' => $body,
	'host' => 'server1',
	'fields' => array(
		'cat' => 'impression'
	)
)));
 
$transport->close();


Im Thrift Server Code fügen wir nun die Methode Impression ein, damit wir das Objekt nutzen können:

PHP Quellcode

1
2
3
4
5
6
7
8
9
10
/**
	 * unserialize thrift object
	 */
	protected function impression($evt) {
		require_once $GLOBALS['THRIFT_ROOT'] . '/packages/vector/vector_types.php';
		require_once $GLOBALS['THRIFT_ROOT'] . '/protocol/TBinarySerializer.php';
 
		$instance = TBinarySerializer::deserialize($evt->body, 'VectorSequence');
		print_r($instance);
	}


8. Download


Ihr könnt euch den gesamten Quelltext der Beispiele hier herunterladen:
Torben Brodt has attached the following file:

Lexikon 4.1.5, developed by www.viecode.com