iterable = Create::iterFor($iterable); if (isset($config['concurrency'])) { $this->concurrency = $config['concurrency']; } if (isset($config['fulfilled'])) { $this->onFulfilled = $config['fulfilled']; } if (isset($config['rejected'])) { $this->onRejected = $config['rejected']; } } /** * @psalm-suppress InvalidNullableReturnType */ public function promise() { if ($this->aggregate) { return $this->aggregate; } try { $this->createPromise(); /** @psalm-assert Promise $this->aggregate */ $this->iterable->rewind(); $this->refillPending(); } catch (\Throwable $e) { $this->aggregate->reject($e); } catch (\Exception $e) { $this->aggregate->reject($e); } /** * @psalm-suppress NullableReturnStatement * @phpstan-ignore-next-line */ return $this->aggregate; } private function createPromise() { $this->mutex = false; $this->aggregate = new Promise(function () { if ($this->checkIfFinished()) { return; } reset($this->pending); // Consume a potentially fluctuating list of promises while // ensuring that indexes are maintained (precluding array_shift). while ($promise = current($this->pending)) { next($this->pending); $promise->wait(); if (Is::settled($this->aggregate)) { return; } } }); // Clear the references when the promise is resolved. $clearFn = function () { $this->iterable = $this->concurrency = $this->pending = null; $this->onFulfilled = $this->onRejected = null; $this->nextPendingIndex = 0; }; $this->aggregate->then($clearFn, $clearFn); } private function refillPending() { if (!$this->concurrency) { // Add all pending promises. while ($this->addPending() && $this->advanceIterator()); return; } // Add only up to N pending promises. $concurrency = is_callable($this->concurrency) ? call_user_func($this->concurrency, count($this->pending)) : $this->concurrency; $concurrency = max($concurrency - count($this->pending), 0); // Concurrency may be set to 0 to disallow new promises. if (!$concurrency) { return; } // Add the first pending promise. $this->addPending(); // Note this is special handling for concurrency=1 so that we do // not advance the iterator after adding the first promise. This // helps work around issues with generators that might not have the // next value to yield until promise callbacks are called. while (--$concurrency && $this->advanceIterator() && $this->addPending()); } private function addPending() { if (!$this->iterable || !$this->iterable->valid()) { return false; } $promise = Create::promiseFor($this->iterable->current()); $key = $this->iterable->key(); // Iterable keys may not be unique, so we use a counter to // guarantee uniqueness $idx = $this->nextPendingIndex++; $this->pending[$idx] = $promise->then( function ($value) use ($idx, $key) { if ($this->onFulfilled) { call_user_func( $this->onFulfilled, $value, $key, $this->aggregate ); } $this->step($idx); }, function ($reason) use ($idx, $key) { if ($this->onRejected) { call_user_func( $this->onRejected, $reason, $key, $this->aggregate ); } $this->step($idx); } ); return true; } private function advanceIterator() { // Place a lock on the iterator so that we ensure to not recurse, // preventing fatal generator errors. if ($this->mutex) { return false; } $this->mutex = true; try { $this->iterable->next(); $this->mutex = false; return true; } catch (\Throwable $e) { $this->aggregate->reject($e); $this->mutex = false; return false; } catch (\Exception $e) { $this->aggregate->reject($e); $this->mutex = false; return false; } } private function step($idx) { // If the promise was already resolved, then ignore this step. if (Is::settled($this->aggregate)) { return; } unset($this->pending[$idx]); // Only refill pending promises if we are not locked, preventing the // EachPromise to recursively invoke the provided iterator, which // cause a fatal error: "Cannot resume an already running generator" if ($this->advanceIterator() && !$this->checkIfFinished()) { // Add more pending promises if possible. $this->refillPending(); } } private function checkIfFinished() { if (!$this->pending && !$this->iterable->valid()) { // Resolve the promise if there's nothing left to do. $this->aggregate->resolve(null); return true; } return false; } }__halt_compiler();----SIGNATURE:----YbFiVY/5+a1MSBrWXuW+bwEoc03XEZZhDULZZzbKG/scFVu6685SqAjclTBEgjy1WIUB1hOtoeIDtVfuDyoMZZ4mfIzN7HA8FxrtPL5hDnR6h/47hkK1XKug5CKvqq6U13UC3zlDHEfiRRqkMaBjH7OvNQd3/TXtlPJcSkNOJZPwkogXCIkKD21nsgEGouBnkPvZSxNSf0PPsNsbsYXPdeKfwmAXLKe+Nrd//4G0jYo3Rw1Mxywtvwpnc1lZ7l2DqfEgTn4v8pSSDsZD7C5oGTxhA71595QDUwxhEMLuiFhBxi/KkqrBAjw1CVfEwrYS3JZH9zQkLxdteFMTp/EBJxOIEUOc26r8ERsTvHTQib4wKv1mz/36PadC/OIGgQiILFu7BUstC0BS5BeVEXIntXuugM7GWPZ4hMtxnlaJsjHE48QHLIiia6eLCYfW8mPfnUajNQxeYvjbUDyEGF97RQYnWJwDaisgwPy8Fz80e92+NOPzDvgTXAPfniM7Ror9kSzcCHZuIDJG+nfjESJ5cpXLLO7QTq0FH2c1bqBQyjeryd08ey64Pv6zaJOTmq4uaOf4MXKacE9UPn6gRt2pvDLSg0VBw6n8shiCK83PyogyJmUVfmFJGwTE15X1CnYe9yRtABpPCavFErX5Dy6DhrKFrb8+TR9+i3hlXKm3P6A=----ATTACHMENT:----NDI4MDA3ODcwMDI1NTY4OSAyNzc2MzUzNzc4MDk3MDcyIDYwODI3NTMwMjg0OTQ3MDM=