/
Client.php
131 lines (112 loc) · 3.98 KB
/
Client.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
<?php
/*
* This file is part of the FOSElasticaBundle package.
*
* (c) FriendsOfSymfony <http://friendsofsymfony.github.com/>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace FOS\ElasticaBundle\Elastica;
use Elastica\Client as BaseClient;
use Elastica\Exception\ClientException;
use Elastica\Request;
use FOS\ElasticaBundle\Logger\ElasticaLogger;
use Symfony\Component\Stopwatch\Stopwatch;
/**
* Extends the default Elastica client to provide logging for errors that occur
* during communication with ElasticSearch.
*
* @author Gordon Franke <info@nevalon.de>
*/
class Client extends BaseClient
{
/**
* Stores created indexes to avoid recreation.
*
* @var array
*/
private $indexCache = [];
/**
* Symfony's debugging Stopwatch.
*
* @var Stopwatch|null
*/
private $stopwatch;
/**
* {@inheritdoc}
*/
public function request($path, $method = Request::GET, $data = [], array $query = [], $contentType = Request::DEFAULT_CONTENT_TYPE)
{
if ($this->stopwatch) {
$this->stopwatch->start('es_request', 'fos_elastica');
}
$response = parent::request($path, $method, $data, $query, $contentType);
$responseData = $response->getData();
$transportInfo = $response->getTransferInfo();
$connection = $this->getLastRequest()->getConnection();
$forbiddenHttpCodes = $connection->hasConfig('http_error_codes') ? $connection->getConfig('http_error_codes') : [];
if (isset($transportInfo['http_code']) && in_array($transportInfo['http_code'], $forbiddenHttpCodes, true)) {
$body = is_array($responseData) ? json_encode($responseData) : $responseData;
$message = sprintf('Error in transportInfo: response code is %s, response body is %s', $transportInfo['http_code'], $body);
throw new ClientException($message);
}
if (isset($responseData['took']) && isset($responseData['hits'])) {
$this->logQuery($path, $method, $data, $query, $response->getQueryTime(), $response->getEngineTime(), $responseData['hits']['total']);
} else {
$this->logQuery($path, $method, $data, $query, $response->getQueryTime(), 0, 0);
}
if ($this->stopwatch) {
$this->stopwatch->stop('es_request');
}
return $response;
}
/**
* @param string $name
*
* @return Index|mixed
*/
public function getIndex($name)
{
if (isset($this->indexCache[$name])) {
return $this->indexCache[$name];
}
return $this->indexCache[$name] = new Index($this, $name);
}
/**
* Sets a stopwatch instance for debugging purposes.
*
* @param Stopwatch $stopwatch
*/
public function setStopwatch(Stopwatch $stopwatch = null)
{
$this->stopwatch = $stopwatch;
}
/**
* Log the query if we have an instance of ElasticaLogger.
*
* @param string $path
* @param string $method
* @param array|string $data
* @param array $query
* @param int $queryTime
* @param int $engineMS
* @param int $itemCount
*/
private function logQuery($path, $method, $data, array $query, $queryTime, $engineMS = 0, $itemCount = 0)
{
if (!$this->_logger or !$this->_logger instanceof ElasticaLogger) {
return;
}
$connection = $this->getLastRequest()->getConnection();
$connectionArray = [
'host' => $connection->getHost(),
'port' => $connection->getPort(),
'transport' => $connection->getTransport(),
'headers' => $connection->hasConfig('headers') ? $connection->getConfig('headers') : [],
];
/** @var ElasticaLogger $logger */
$logger = $this->_logger;
$logger->logQuery($path, $method, $data, $queryTime, $connectionArray, $query, $engineMS, $itemCount);
}
}