ElasticsearchLogstashHandler.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Bridge\Monolog\Handler;
  11. use Monolog\Formatter\FormatterInterface;
  12. use Monolog\Formatter\LogstashFormatter;
  13. use Monolog\Handler\AbstractHandler;
  14. use Monolog\Handler\FormattableHandlerTrait;
  15. use Monolog\Handler\ProcessableHandlerTrait;
  16. use Monolog\Logger;
  17. use Symfony\Component\HttpClient\HttpClient;
  18. use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
  19. use Symfony\Contracts\HttpClient\HttpClientInterface;
  20. /**
  21. * Push logs directly to Elasticsearch and format them according to Logstash specification.
  22. *
  23. * This handler dials directly with the HTTP interface of Elasticsearch. This
  24. * means it will slow down your application if Elasticsearch takes times to
  25. * answer. Even if all HTTP calls are done asynchronously.
  26. *
  27. * In a development environment, it's fine to keep the default configuration:
  28. * for each log, an HTTP request will be made to push the log to Elasticsearch.
  29. *
  30. * In a production environment, it's highly recommended to wrap this handler
  31. * in a handler with buffering capabilities (like the FingersCrossedHandler, or
  32. * BufferHandler) in order to call Elasticsearch only once with a bulk push. For
  33. * even better performance and fault tolerance, a proper ELK (https://www.elastic.co/what-is/elk-stack)
  34. * stack is recommended.
  35. *
  36. * @author Grégoire Pineau <lyrixx@lyrixx.info>
  37. */
  38. class ElasticsearchLogstashHandler extends AbstractHandler
  39. {
  40. use FormattableHandlerTrait;
  41. use ProcessableHandlerTrait;
  42. private $endpoint;
  43. private $index;
  44. private $client;
  45. private $responses;
  46. /**
  47. * @param string|int $level The minimum logging level at which this handler will be triggered
  48. */
  49. public function __construct(string $endpoint = 'http://127.0.0.1:9200', string $index = 'monolog', HttpClientInterface $client = null, $level = Logger::DEBUG, bool $bubble = true)
  50. {
  51. if (!interface_exists(HttpClientInterface::class)) {
  52. throw new \LogicException(sprintf('The "%s" handler needs an HTTP client. Try running "composer require symfony/http-client".', __CLASS__));
  53. }
  54. parent::__construct($level, $bubble);
  55. $this->endpoint = $endpoint;
  56. $this->index = $index;
  57. $this->client = $client ?: HttpClient::create(['timeout' => 1]);
  58. $this->responses = new \SplObjectStorage();
  59. }
  60. public function handle(array $record): bool
  61. {
  62. if (!$this->isHandling($record)) {
  63. return false;
  64. }
  65. $this->sendToElasticsearch([$record]);
  66. return !$this->bubble;
  67. }
  68. public function handleBatch(array $records): void
  69. {
  70. $records = array_filter($records, [$this, 'isHandling']);
  71. if ($records) {
  72. $this->sendToElasticsearch($records);
  73. }
  74. }
  75. protected function getDefaultFormatter(): FormatterInterface
  76. {
  77. // Monolog 1.X
  78. if (\defined(LogstashFormatter::class.'::V1')) {
  79. return new LogstashFormatter('application', null, null, 'ctxt_', LogstashFormatter::V1);
  80. }
  81. // Monolog 2.X
  82. return new LogstashFormatter('application');
  83. }
  84. private function sendToElasticsearch(array $records)
  85. {
  86. $formatter = $this->getFormatter();
  87. $body = '';
  88. foreach ($records as $record) {
  89. foreach ($this->processors as $processor) {
  90. $record = $processor($record);
  91. }
  92. $body .= json_encode([
  93. 'index' => [
  94. '_index' => $this->index,
  95. '_type' => '_doc',
  96. ],
  97. ]);
  98. $body .= "\n";
  99. $body .= $formatter->format($record);
  100. $body .= "\n";
  101. }
  102. $response = $this->client->request('POST', $this->endpoint.'/_bulk', [
  103. 'body' => $body,
  104. 'headers' => [
  105. 'Content-Type' => 'application/json',
  106. ],
  107. ]);
  108. $this->responses->attach($response);
  109. $this->wait(false);
  110. }
  111. public function __sleep()
  112. {
  113. throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
  114. }
  115. public function __wakeup()
  116. {
  117. throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
  118. }
  119. public function __destruct()
  120. {
  121. $this->wait(true);
  122. }
  123. private function wait(bool $blocking)
  124. {
  125. foreach ($this->client->stream($this->responses, $blocking ? null : 0.0) as $response => $chunk) {
  126. try {
  127. if ($chunk->isTimeout() && !$blocking) {
  128. continue;
  129. }
  130. if (!$chunk->isFirst() && !$chunk->isLast()) {
  131. continue;
  132. }
  133. if ($chunk->isLast()) {
  134. $this->responses->detach($response);
  135. }
  136. } catch (ExceptionInterface $e) {
  137. $this->responses->detach($response);
  138. error_log(sprintf("Could not push logs to Elasticsearch:\n%s", (string) $e));
  139. }
  140. }
  141. }
  142. }