vendor/ruflin/elastica/src/Client.php line 517

Open in your IDE?
  1. <?php
  2. namespace Elastica;
  3. use Elastica\Bulk\Action;
  4. use Elastica\Bulk\ResponseSet;
  5. use Elastica\Exception\ClientException;
  6. use Elastica\Exception\ConnectionException;
  7. use Elastica\Exception\InvalidException;
  8. use Elastica\Exception\ResponseException;
  9. use Elastica\Script\AbstractScript;
  10. use Elasticsearch\Endpoints\AbstractEndpoint;
  11. use Elasticsearch\Endpoints\ClosePointInTime;
  12. use Elasticsearch\Endpoints\Indices\ForceMerge;
  13. use Elasticsearch\Endpoints\Indices\Refresh;
  14. use Elasticsearch\Endpoints\Update;
  15. use Psr\Log\LoggerInterface;
  16. use Psr\Log\NullLogger;
  17. /**
  18.  * Client to connect the the elasticsearch server.
  19.  *
  20.  * @author Nicolas Ruflin <spam@ruflin.com>
  21.  */
  22. class Client
  23. {
  24.     /**
  25.      * @var ClientConfiguration
  26.      */
  27.     protected $_config;
  28.     /**
  29.      * @var callable
  30.      */
  31.     protected $_callback;
  32.     /**
  33.      * @var Connection\ConnectionPool
  34.      */
  35.     protected $_connectionPool;
  36.     /**
  37.      * @var Request|null
  38.      */
  39.     protected $_lastRequest;
  40.     /**
  41.      * @var Response|null
  42.      */
  43.     protected $_lastResponse;
  44.     /**
  45.      * @var LoggerInterface
  46.      */
  47.     protected $_logger;
  48.     /**
  49.      * @var string
  50.      */
  51.     protected $_version;
  52.     /**
  53.      * Creates a new Elastica client.
  54.      *
  55.      * @param array|string  $config   OPTIONAL Additional config or DSN of options
  56.      * @param callable|null $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down)
  57.      *
  58.      * @throws InvalidException
  59.      */
  60.     public function __construct($config = [], ?callable $callback null, ?LoggerInterface $logger null)
  61.     {
  62.         if (\is_string($config)) {
  63.             $configuration ClientConfiguration::fromDsn($config);
  64.         } elseif (\is_array($config)) {
  65.             $configuration ClientConfiguration::fromArray($config);
  66.         } else {
  67.             throw new InvalidException('Config parameter must be an array or a string.');
  68.         }
  69.         $this->_config $configuration;
  70.         $this->_callback $callback;
  71.         $this->_logger $logger ?? new NullLogger();
  72.         $this->_initConnections();
  73.     }
  74.     /**
  75.      * Get current version.
  76.      */
  77.     public function getVersion(): string
  78.     {
  79.         if ($this->_version) {
  80.             return $this->_version;
  81.         }
  82.         $data $this->request('/')->getData();
  83.         return $this->_version $data['version']['number'];
  84.     }
  85.     /**
  86.      * Sets specific config values (updates and keeps default values).
  87.      *
  88.      * @param array $config Params
  89.      */
  90.     public function setConfig(array $config): self
  91.     {
  92.         foreach ($config as $key => $value) {
  93.             $this->_config->set($key$value);
  94.         }
  95.         return $this;
  96.     }
  97.     /**
  98.      * Returns a specific config key or the whole config array if not set.
  99.      *
  100.      * @throws InvalidException if the given key is not found in the configuration
  101.      *
  102.      * @return array|bool|string
  103.      */
  104.     public function getConfig(string $key '')
  105.     {
  106.         return $this->_config->get($key);
  107.     }
  108.     /**
  109.      * Sets / overwrites a specific config value.
  110.      *
  111.      * @param mixed $value Value
  112.      */
  113.     public function setConfigValue(string $key$value): self
  114.     {
  115.         return $this->setConfig([$key => $value]);
  116.     }
  117.     /**
  118.      * @param array|string $keys    config key or path of config keys
  119.      * @param mixed        $default default value will be returned if key was not found
  120.      *
  121.      * @return mixed
  122.      */
  123.     public function getConfigValue($keys$default null)
  124.     {
  125.         $value $this->_config->getAll();
  126.         foreach ((array) $keys as $key) {
  127.             if (isset($value[$key])) {
  128.                 $value $value[$key];
  129.             } else {
  130.                 return $default;
  131.             }
  132.         }
  133.         return $value;
  134.     }
  135.     /**
  136.      * Returns the index for the given connection.
  137.      */
  138.     public function getIndex(string $name): Index
  139.     {
  140.         return new Index($this$name);
  141.     }
  142.     /**
  143.      * Adds a HTTP Header.
  144.      */
  145.     public function addHeader(string $headerstring $value): self
  146.     {
  147.         if ($this->_config->has('headers')) {
  148.             $headers $this->_config->get('headers');
  149.         } else {
  150.             $headers = [];
  151.         }
  152.         $headers[$header] = $value;
  153.         $this->_config->set('headers'$headers);
  154.         return $this;
  155.     }
  156.     /**
  157.      * Remove a HTTP Header.
  158.      */
  159.     public function removeHeader(string $header): self
  160.     {
  161.         if ($this->_config->has('headers')) {
  162.             $headers $this->_config->get('headers');
  163.             unset($headers[$header]);
  164.             $this->_config->set('headers'$headers);
  165.         }
  166.         return $this;
  167.     }
  168.     /**
  169.      * Uses _bulk to send documents to the server.
  170.      *
  171.      * Array of \Elastica\Document as input. Index has to be set inside the
  172.      * document, because for bulk settings documents, documents can belong to
  173.      * any index
  174.      *
  175.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  176.      *
  177.      * @param array|Document[] $docs Array of Elastica\Document
  178.      *
  179.      * @throws InvalidException If docs is empty
  180.      */
  181.     public function updateDocuments(array $docs, array $requestParams = []): ResponseSet
  182.     {
  183.         if (!$docs) {
  184.             throw new InvalidException('Array has to consist of at least one element');
  185.         }
  186.         $bulk = new Bulk($this);
  187.         $bulk->addDocuments($docsAction::OP_TYPE_UPDATE);
  188.         foreach ($requestParams as $key => $value) {
  189.             $bulk->setRequestParam($key$value);
  190.         }
  191.         return $bulk->send();
  192.     }
  193.     /**
  194.      * Uses _bulk to send documents to the server.
  195.      *
  196.      * Array of \Elastica\Document as input. Index has to be set inside the
  197.      * document, because for bulk settings documents, documents can belong to
  198.      * any index
  199.      *
  200.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  201.      *
  202.      * @param array|Document[] $docs Array of Elastica\Document
  203.      *
  204.      * @throws InvalidException If docs is empty
  205.      */
  206.     public function addDocuments(array $docs, array $requestParams = []): ResponseSet
  207.     {
  208.         if (!$docs) {
  209.             throw new InvalidException('Array has to consist of at least one element');
  210.         }
  211.         $bulk = new Bulk($this);
  212.         $bulk->addDocuments($docs);
  213.         foreach ($requestParams as $key => $value) {
  214.             $bulk->setRequestParam($key$value);
  215.         }
  216.         return $bulk->send();
  217.     }
  218.     /**
  219.      * Update document, using update script. Requires elasticsearch >= 0.19.0.
  220.      *
  221.      * @param int|string                    $id      document id
  222.      * @param AbstractScript|array|Document $data    raw data for request body
  223.      * @param string                        $index   index to update
  224.      * @param array                         $options array of query params to use for query. For possible options check es api
  225.      *
  226.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
  227.      */
  228.     public function updateDocument($id$data$index, array $options = []): Response
  229.     {
  230.         $endpoint = new Update();
  231.         $endpoint->setId($id);
  232.         $endpoint->setIndex($index);
  233.         if ($data instanceof AbstractScript) {
  234.             $requestData $data->toArray();
  235.         } elseif ($data instanceof Document) {
  236.             $requestData = ['doc' => $data->getData()];
  237.             if ($data->getDocAsUpsert()) {
  238.                 $requestData['doc_as_upsert'] = true;
  239.             }
  240.             $docOptions $data->getOptions(
  241.                 [
  242.                     'consistency',
  243.                     'parent',
  244.                     'percolate',
  245.                     'refresh',
  246.                     'replication',
  247.                     'retry_on_conflict',
  248.                     'routing',
  249.                     'timeout',
  250.                 ]
  251.             );
  252.             $options += $docOptions;
  253.         } else {
  254.             $requestData $data;
  255.         }
  256.         // If an upsert document exists
  257.         if ($data instanceof AbstractScript || $data instanceof Document) {
  258.             if ($data->hasUpsert()) {
  259.                 $requestData['upsert'] = $data->getUpsert()->getData();
  260.             }
  261.         }
  262.         $endpoint->setBody($requestData);
  263.         $endpoint->setParams($options);
  264.         $response $this->requestEndpoint($endpoint);
  265.         if ($response->isOk()
  266.             && $data instanceof Document
  267.             && ($data->isAutoPopulate() || $this->getConfigValue(['document''autoPopulate'], false))
  268.         ) {
  269.             $data->setVersionParams($response->getData());
  270.         }
  271.         return $response;
  272.     }
  273.     /**
  274.      * Bulk deletes documents.
  275.      *
  276.      * @param array|Document[] $docs
  277.      *
  278.      * @throws InvalidException
  279.      */
  280.     public function deleteDocuments(array $docs, array $requestParams = []): ResponseSet
  281.     {
  282.         if (!$docs) {
  283.             throw new InvalidException('Array has to consist of at least one element');
  284.         }
  285.         $bulk = new Bulk($this);
  286.         $bulk->addDocuments($docsAction::OP_TYPE_DELETE);
  287.         foreach ($requestParams as $key => $value) {
  288.             $bulk->setRequestParam($key$value);
  289.         }
  290.         return $bulk->send();
  291.     }
  292.     /**
  293.      * Returns the status object for all indices.
  294.      *
  295.      * @return Status
  296.      */
  297.     public function getStatus()
  298.     {
  299.         return new Status($this);
  300.     }
  301.     /**
  302.      * Returns the current cluster.
  303.      *
  304.      * @return Cluster
  305.      */
  306.     public function getCluster()
  307.     {
  308.         return new Cluster($this);
  309.     }
  310.     /**
  311.      * Establishes the client connections.
  312.      */
  313.     public function connect()
  314.     {
  315.         $this->_initConnections();
  316.     }
  317.     /**
  318.      * @return $this
  319.      */
  320.     public function addConnection(Connection $connection)
  321.     {
  322.         $this->_connectionPool->addConnection($connection);
  323.         return $this;
  324.     }
  325.     /**
  326.      * Determines whether a valid connection is available for use.
  327.      *
  328.      * @return bool
  329.      */
  330.     public function hasConnection()
  331.     {
  332.         return $this->_connectionPool->hasConnection();
  333.     }
  334.     /**
  335.      * @throws ClientException
  336.      *
  337.      * @return Connection
  338.      */
  339.     public function getConnection()
  340.     {
  341.         return $this->_connectionPool->getConnection();
  342.     }
  343.     /**
  344.      * @return Connection[]
  345.      */
  346.     public function getConnections()
  347.     {
  348.         return $this->_connectionPool->getConnections();
  349.     }
  350.     /**
  351.      * @return \Elastica\Connection\Strategy\StrategyInterface
  352.      */
  353.     public function getConnectionStrategy()
  354.     {
  355.         return $this->_connectionPool->getStrategy();
  356.     }
  357.     /**
  358.      * @param array|Connection[] $connections
  359.      *
  360.      * @return $this
  361.      */
  362.     public function setConnections(array $connections)
  363.     {
  364.         $this->_connectionPool->setConnections($connections);
  365.         return $this;
  366.     }
  367.     /**
  368.      * Deletes documents with the given ids, index, type from the index.
  369.      *
  370.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  371.      *
  372.      * @param array        $ids     Document ids
  373.      * @param Index|string $index   Index name
  374.      * @param bool|string  $routing Optional routing key for all ids
  375.      *
  376.      * @throws InvalidException
  377.      */
  378.     public function deleteIds(array $ids$index$routing false): ResponseSet
  379.     {
  380.         if (!$ids) {
  381.             throw new InvalidException('Array has to consist of at least one id');
  382.         }
  383.         $bulk = new Bulk($this);
  384.         $bulk->setIndex($index);
  385.         foreach ($ids as $id) {
  386.             $action = new Action(Action::OP_TYPE_DELETE);
  387.             $action->setId($id);
  388.             if (!empty($routing)) {
  389.                 $action->setRouting($routing);
  390.             }
  391.             $bulk->addAction($action);
  392.         }
  393.         return $bulk->send();
  394.     }
  395.     /**
  396.      * Bulk operation.
  397.      *
  398.      * Every entry in the params array has to exactly on array
  399.      * of the bulk operation. An example param array would be:
  400.      *
  401.      * array(
  402.      *         array('index' => array('_index' => 'test', '_id' => '1')),
  403.      *         array('field1' => 'value1'),
  404.      *         array('delete' => array('_index' => 'test', '_id' => '2')),
  405.      *         array('create' => array('_index' => 'test', '_id' => '3')),
  406.      *         array('field1' => 'value3'),
  407.      *         array('update' => array('_id' => '1', '_index' => 'test')),
  408.      *         array('doc' => array('field2' => 'value2')),
  409.      * );
  410.      *
  411.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  412.      *
  413.      * @throws ResponseException
  414.      * @throws InvalidException
  415.      */
  416.     public function bulk(array $params): ResponseSet
  417.     {
  418.         if (!$params) {
  419.             throw new InvalidException('Array has to consist of at least one param');
  420.         }
  421.         $bulk = new Bulk($this);
  422.         $bulk->addRawData($params);
  423.         return $bulk->send();
  424.     }
  425.     /**
  426.      * Makes calls to the elasticsearch server based on this index.
  427.      *
  428.      * It's possible to make any REST query directly over this method
  429.      *
  430.      * @param string       $path        Path to call
  431.      * @param string       $method      Rest method to use (GET, POST, DELETE, PUT)
  432.      * @param array|string $data        OPTIONAL Arguments as array or pre-encoded string
  433.      * @param array        $query       OPTIONAL Query params
  434.      * @param string       $contentType Content-Type sent with this request
  435.      *
  436.      * @throws ClientException
  437.      * @throws ConnectionException
  438.      * @throws ResponseException
  439.      */
  440.     public function request(string $pathstring $method Request::GET$data = [], array $query = [], string $contentType Request::DEFAULT_CONTENT_TYPE): Response
  441.     {
  442.         $connection $this->getConnection();
  443.         $request $this->_lastRequest = new Request($path$method$data$query$connection$contentType);
  444.         $this->_lastResponse null;
  445.         try {
  446.             $response $this->_lastResponse $request->send();
  447.         } catch (ConnectionException $e) {
  448.             $this->_connectionPool->onFail($connection$e$this);
  449.             $this->_logger->error('Elastica Request Failure', [
  450.                 'exception' => $e,
  451.                 'request' => $e->getRequest()->toArray(),
  452.                 'retry' => $this->hasConnection(),
  453.             ]);
  454.             // In case there is no valid connection left, throw exception which caused the disabling of the connection.
  455.             if (!$this->hasConnection()) {
  456.                 throw $e;
  457.             }
  458.             return $this->request($path$method$data$query);
  459.         }
  460.         $this->_logger->debug('Elastica Request', [
  461.             'request' => $request->toArray(),
  462.             'response' => $response->getData(),
  463.             'responseStatus' => $response->getStatus(),
  464.         ]);
  465.         return $response;
  466.     }
  467.     /**
  468.      * Makes calls to the elasticsearch server with usage official client Endpoint.
  469.      */
  470.     public function requestEndpoint(AbstractEndpoint $endpoint): Response
  471.     {
  472.         return $this->request(
  473.             \ltrim($endpoint->getURI(), '/'),
  474.             $endpoint->getMethod(),
  475.             $endpoint->getBody() ?? [],
  476.             $endpoint->getParams()
  477.         );
  478.     }
  479.     /**
  480.      * Force merges all search indices.
  481.      *
  482.      * @param array $args OPTIONAL Optional arguments
  483.      *
  484.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
  485.      */
  486.     public function forcemergeAll($args = []): Response
  487.     {
  488.         $endpoint = new ForceMerge();
  489.         $endpoint->setParams($args);
  490.         return $this->requestEndpoint($endpoint);
  491.     }
  492.     /**
  493.      * Closes the given PointInTime.
  494.      *
  495.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html#close-point-in-time-api
  496.      */
  497.     public function closePointInTime(string $pointInTimeId): Response
  498.     {
  499.         $endpoint = new ClosePointInTime();
  500.         $endpoint->setBody(['id' => $pointInTimeId]);
  501.         return $this->requestEndpoint($endpoint);
  502.     }
  503.     /**
  504.      * Refreshes all search indices.
  505.      *
  506.      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
  507.      */
  508.     public function refreshAll(): Response
  509.     {
  510.         return $this->requestEndpoint(new Refresh());
  511.     }
  512.     public function getLastRequest(): ?Request
  513.     {
  514.         return $this->_lastRequest;
  515.     }
  516.     public function getLastResponse(): ?Response
  517.     {
  518.         return $this->_lastResponse;
  519.     }
  520.     /**
  521.      * Replace the existing logger.
  522.      *
  523.      * @return $this
  524.      */
  525.     public function setLogger(LoggerInterface $logger)
  526.     {
  527.         $this->_logger $logger;
  528.         return $this;
  529.     }
  530.     /**
  531.      * Inits the client connections.
  532.      */
  533.     protected function _initConnections(): void
  534.     {
  535.         $connections = [];
  536.         foreach ($this->getConfig('connections') as $connection) {
  537.             $connections[] = Connection::create($this->_prepareConnectionParams($connection));
  538.         }
  539.         if ($this->_config->has('servers')) {
  540.             $servers $this->_config->get('servers');
  541.             foreach ($servers as $server) {
  542.                 $connections[] = Connection::create($this->_prepareConnectionParams($server));
  543.             }
  544.         }
  545.         // If no connections set, create default connection
  546.         if (!$connections) {
  547.             $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
  548.         }
  549.         if (!$this->_config->has('connectionStrategy')) {
  550.             if (true === $this->getConfig('roundRobin')) {
  551.                 $this->setConfigValue('connectionStrategy''RoundRobin');
  552.             } else {
  553.                 $this->setConfigValue('connectionStrategy''Simple');
  554.             }
  555.         }
  556.         $strategy Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));
  557.         $this->_connectionPool = new Connection\ConnectionPool($connections$strategy$this->_callback);
  558.     }
  559.     /**
  560.      * Creates a Connection params array from a Client or server config array.
  561.      */
  562.     protected function _prepareConnectionParams(array $config): array
  563.     {
  564.         $params = [];
  565.         $params['config'] = [];
  566.         foreach ($config as $key => $value) {
  567.             if (\in_array($key, ['bigintConversion''curl''headers''url'])) {
  568.                 $params['config'][$key] = $value;
  569.             } else {
  570.                 $params[$key] = $value;
  571.             }
  572.         }
  573.         return $params;
  574.     }
  575. }