diff --git a/bin/test.sh b/bin/test.sh
index 4583e56f1..f392e616b 100755
--- a/bin/test.sh
+++ b/bin/test.sh
@@ -3,4 +3,4 @@
set -x
set -e
-docker-compose run --workdir="/mqdev" --rm dev ./docker/bin/test.sh "$@"
+docker-compose run --workdir="/mqdev" --rm dev ./docker/bin/test.sh $@
diff --git a/composer.json b/composer.json
index c6d19120f..d57b35757 100644
--- a/composer.json
+++ b/composer.json
@@ -33,7 +33,7 @@
"require-dev": {
"phpunit/phpunit": "^5.5",
"phpstan/phpstan": "^0.10",
- "queue-interop/queue-spec": "^0.5.5@dev",
+ "queue-interop/queue-spec": "^0.5.9@dev",
"symfony/browser-kit": "4.0.*",
"symfony/config": "4.0.*",
"symfony/process": "4.0.*",
@@ -73,13 +73,11 @@
"Enqueue\\Sqs\\": "pkg/sqs/",
"Enqueue\\Stomp\\": "pkg/stomp/",
"Enqueue\\Test\\": "pkg/test/",
+ "Enqueue\\Dsn\\": "pkg/dsn/",
"Enqueue\\": "pkg/enqueue/"
},
"exclude-from-classmap": [
"/Tests/"
- ],
- "files": [
- "pkg/enqueue/functions_include.php"
]
},
"autoload-dev": {
diff --git a/docs/transport/amqp.md b/docs/transport/amqp.md
index 9ff8e9900..7c073d407 100644
--- a/docs/transport/amqp.md
+++ b/docs/transport/amqp.md
@@ -63,9 +63,9 @@ $factory = new AmqpConnectionFactory([
$psrContext = $factory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('amqp:');
-$psrContext = \Enqueue\dsn_to_context('amqp+ext:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp:')->createContext();
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp+ext:')->createContext();
```
## Declare topic.
diff --git a/docs/transport/amqp_bunny.md b/docs/transport/amqp_bunny.md
index 72dc3184d..241cfe836 100644
--- a/docs/transport/amqp_bunny.md
+++ b/docs/transport/amqp_bunny.md
@@ -53,9 +53,9 @@ $factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f');
$psrContext = $factory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('amqp:');
-$psrContext = \Enqueue\dsn_to_context('amqp+bunny:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp:')->createContext();
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp+bunny:')->createContext();
```
## Declare topic.
diff --git a/docs/transport/amqp_lib.md b/docs/transport/amqp_lib.md
index faccda01c..6f749233b 100644
--- a/docs/transport/amqp_lib.md
+++ b/docs/transport/amqp_lib.md
@@ -61,9 +61,9 @@ $factory = new AmqpConnectionFactory([
$psrContext = $factory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('amqp:');
-$psrContext = \Enqueue\dsn_to_context('amqp+lib:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp:')->createContext();
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp+lib:')->createContext();
```
## Declare topic.
diff --git a/docs/transport/dbal.md b/docs/transport/dbal.md
index 2d84782e1..9fcaab04a 100644
--- a/docs/transport/dbal.md
+++ b/docs/transport/dbal.md
@@ -49,8 +49,8 @@ $factory = new ManagerRegistryConnectionFactory($registry, [
$psrContext = $factory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('mysql:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('mysql:')->createContext();
```
## Init database
diff --git a/docs/transport/filesystem.md b/docs/transport/filesystem.md
index 825b7b5a2..02154546f 100644
--- a/docs/transport/filesystem.md
+++ b/docs/transport/filesystem.md
@@ -48,8 +48,8 @@ $connectionFactory = new FsConnectionFactory([
$psrContext = $connectionFactory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('file:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('file:')->createContext();
```
## Send message to topic
diff --git a/docs/transport/gearman.md b/docs/transport/gearman.md
index 0161048d5..1baca30e0 100644
--- a/docs/transport/gearman.md
+++ b/docs/transport/gearman.md
@@ -39,8 +39,8 @@ $factory = new GearmanConnectionFactory([
$psrContext = $factory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('gearman:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('gearman:')->createContext();
```
## Send message to topic
diff --git a/docs/transport/gps.md b/docs/transport/gps.md
index 7d0197f23..86269f9c5 100644
--- a/docs/transport/gps.md
+++ b/docs/transport/gps.md
@@ -32,8 +32,8 @@ $connectionFactory = new GpsConnectionFactory('gps:');
$psrContext = $connectionFactory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('gps:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('gps:')->createContext();
```
## Send message to topic
diff --git a/docs/transport/kafka.md b/docs/transport/kafka.md
index 77006f9d6..b2bb92e09 100644
--- a/docs/transport/kafka.md
+++ b/docs/transport/kafka.md
@@ -45,8 +45,8 @@ $connectionFactory = new RdKafkaConnectionFactory([
$psrContext = $connectionFactory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('kafka:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('kafka:')->createContext();
```
## Send message to topic
diff --git a/docs/transport/mongodb.md b/docs/transport/mongodb.md
index 315bb9bdb..0904c11ed 100644
--- a/docs/transport/mongodb.md
+++ b/docs/transport/mongodb.md
@@ -41,8 +41,8 @@ $factory = new MongodbConnectionFactory([
$psrContext = $factory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('mongodb:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('mongodb:')->createContext();
```
## Send message to topic
diff --git a/docs/transport/pheanstalk.md b/docs/transport/pheanstalk.md
index 4371c2966..e0ee49b8b 100644
--- a/docs/transport/pheanstalk.md
+++ b/docs/transport/pheanstalk.md
@@ -39,8 +39,8 @@ $factory = new PheanstalkConnectionFactory([
$psrContext = $factory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('beanstalk:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('beanstalk:')->createContext();
```
## Send message to topic
diff --git a/docs/transport/redis.md b/docs/transport/redis.md
index b1b715291..871a5d8d2 100644
--- a/docs/transport/redis.md
+++ b/docs/transport/redis.md
@@ -60,8 +60,8 @@ $factory = new RedisConnectionFactory('redis://example.com:1000?vendor=phpredis'
$psrContext = $factory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('redis:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('redis:')->createContext();
// pass redis instance directly
$redis = new \Enqueue\Redis\PhpRedis([ /** redis connection options */ ]);
diff --git a/docs/transport/sqs.md b/docs/transport/sqs.md
index 6706fae8d..01ecc698d 100644
--- a/docs/transport/sqs.md
+++ b/docs/transport/sqs.md
@@ -38,8 +38,8 @@ $psrContext = $factory->createContext();
$client = new Aws\Sqs\SqsClient([ /* ... */ ]);
$factory = new SqsConnectionFactory($client);
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('sqs:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('sqs:')->createContext();
```
## Declare queue.
diff --git a/docs/transport/stomp.md b/docs/transport/stomp.md
index fb9ea7694..251d6b135 100644
--- a/docs/transport/stomp.md
+++ b/docs/transport/stomp.md
@@ -39,8 +39,8 @@ $factory = new StompConnectionFactory('stomp://example.com:1000?login=theLogin')
$psrContext = $factory->createContext();
-// if you have enqueue/enqueue library installed you can use a function from there to create the context
-$psrContext = \Enqueue\dsn_to_context('stomp:');
+// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
+$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('stomp:')->createContext();
```
## Send message to topic
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 8ef3a3059..10afc56e7 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -100,6 +100,10 @@
pkg/async-command/Tests
+
+
+ pkg/dsn/Tests
+
diff --git a/pkg/dbal/DbalConnectionFactory.php b/pkg/dbal/DbalConnectionFactory.php
index df79dda33..22b814340 100644
--- a/pkg/dbal/DbalConnectionFactory.php
+++ b/pkg/dbal/DbalConnectionFactory.php
@@ -92,51 +92,55 @@ private function establishConnection()
return $this->connection;
}
- /**
- * @param string $dsn
- *
- * @return array
- */
- private function parseDsn($dsn)
+ private function parseDsn(string $dsn): array
{
+ if (false === strpos($dsn, ':')) {
+ throw new \LogicException(sprintf('The DSN is invalid. It does not have scheme separator ":".'));
+ }
+
if (false === parse_url($dsn)) {
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}
- if (!preg_match('/^([0-9a-z_]+):(.+)?$/', $dsn, $matches)) {
- throw new \LogicException('Schema is empty');
+ list($scheme) = explode(':', $dsn, 2);
+
+ $scheme = strtolower($scheme);
+ if (false == preg_match('/^[a-z\d+-.]*$/', $scheme)) {
+ throw new \LogicException('The DSN is invalid. Scheme contains illegal symbols.');
}
- $schema = $matches[1];
$supported = [
- 'db2' => true,
- 'ibm_db2' => true,
- 'mssql' => true,
- 'pdo_sqlsrv' => true,
- 'mysql' => true,
- 'mysql2' => true,
- 'pdo_mysql' => true,
- 'pgsql' => true,
- 'postgres' => true,
- 'postgresql' => true,
- 'pdo_pgsql' => true,
- 'sqlite' => true,
- 'sqlite3' => true,
- 'pdo_sqlite' => true,
+ 'db2' => 'db2',
+ 'ibm-db2' => 'ibm-db2',
+ 'mssql' => 'mssql',
+ 'sqlsrv+pdo' => 'pdo_sqlsrv',
+ 'mysql' => 'mysql',
+ 'mysql2' => 'mysql2',
+ 'mysql+pdo' => 'pdo_mysql',
+ 'pgsql' => 'pgsql',
+ 'postgres' => 'postgres',
+ 'pgsql+pdo' => 'pdo_pgsql',
+ 'sqlite' => 'sqlite',
+ 'sqlite3' => 'sqlite3',
+ 'sqlite+pdo' => 'pdo_sqlite',
];
- if (false == isset($supported[$schema])) {
+ if (false == isset($supported[$scheme])) {
throw new \LogicException(sprintf(
'The given DSN schema "%s" is not supported. There are supported schemes: "%s".',
- $schema,
+ $scheme,
implode('", "', array_keys($supported))
));
}
+ $doctrineScheme = $supported[$scheme];
+
return [
'lazy' => true,
'connection' => [
- 'url' => $schema.':' === $dsn ? $schema.'://root@localhost' : $dsn,
+ 'url' => $scheme.':' === $dsn ?
+ $doctrineScheme.'://root@localhost' :
+ str_replace($scheme, $doctrineScheme, $dsn),
],
];
}
diff --git a/pkg/dbal/Tests/DbalConnectionFactoryConfigTest.php b/pkg/dbal/Tests/DbalConnectionFactoryConfigTest.php
index 32683a62d..2a8f17f52 100644
--- a/pkg/dbal/Tests/DbalConnectionFactoryConfigTest.php
+++ b/pkg/dbal/Tests/DbalConnectionFactoryConfigTest.php
@@ -24,7 +24,7 @@ public function testThrowNeitherArrayStringNorNullGivenAsConfig()
public function testThrowIfSchemeIsNotSupported()
{
$this->expectException(\LogicException::class);
- $this->expectExceptionMessage('The given DSN schema "http" is not supported. There are supported schemes: "db2", "ibm_db2", "mssql", "pdo_sqlsrv", "mysql", "mysql2", "pdo_mysql", "pgsql", "postgres", "postgresql", "pdo_pgsql", "sqlite", "sqlite3", "pdo_sqlite"');
+ $this->expectExceptionMessage('The given DSN schema "http" is not supported. There are supported schemes: "db2", "ibm-db2", "mssql", "sqlsrv+pdo", "mysql", "mysql2", "mysql+pdo", "pgsql", "postgres", "pgsql+pdo", "sqlite", "sqlite3", "sqlite+pdo".');
new DbalConnectionFactory('http://example.com');
}
@@ -32,7 +32,7 @@ public function testThrowIfSchemeIsNotSupported()
public function testThrowIfDsnCouldNotBeParsed()
{
$this->expectException(\LogicException::class);
- $this->expectExceptionMessage('Schema is empty');
+ $this->expectExceptionMessage('The DSN is invalid. It does not have scheme separator ":".');
new DbalConnectionFactory('invalidDSN');
}
@@ -78,7 +78,7 @@ public static function provideConfigs()
];
yield [
- 'pdo_mysql:',
+ 'mysql+pdo:',
[
'connection' => [
'url' => 'pdo_mysql://root@localhost',
@@ -114,7 +114,7 @@ public static function provideConfigs()
];
yield [
- 'pdo_mysql://user:pass@host:10001/db',
+ 'mysql+pdo://user:pass@host:10001/db',
[
'connection' => [
'url' => 'pdo_mysql://user:pass@host:10001/db',
diff --git a/pkg/dsn/.gitignore b/pkg/dsn/.gitignore
new file mode 100644
index 000000000..a770439e5
--- /dev/null
+++ b/pkg/dsn/.gitignore
@@ -0,0 +1,6 @@
+*~
+/composer.lock
+/composer.phar
+/phpunit.xml
+/vendor/
+/.idea/
diff --git a/pkg/dsn/.travis.yml b/pkg/dsn/.travis.yml
new file mode 100644
index 000000000..bc1ccd01c
--- /dev/null
+++ b/pkg/dsn/.travis.yml
@@ -0,0 +1,21 @@
+sudo: false
+
+git:
+ depth: 10
+
+language: php
+
+php:
+ - '7.1'
+ - '7.2'
+
+cache:
+ directories:
+ - $HOME/.composer/cache
+
+install:
+ - composer self-update
+ - composer install --prefer-source
+
+script:
+ - vendor/bin/phpunit
diff --git a/pkg/dsn/Dsn.php b/pkg/dsn/Dsn.php
new file mode 100644
index 000000000..48b117779
--- /dev/null
+++ b/pkg/dsn/Dsn.php
@@ -0,0 +1,228 @@
+dsn = $dsn;
+ $this->query = [];
+
+ $this->parse($dsn);
+ }
+
+ public function __toString(): string
+ {
+ return $this->dsn;
+ }
+
+ public function getDsn(): string
+ {
+ return $this->dsn;
+ }
+
+ public function getScheme(): string
+ {
+ return $this->scheme;
+ }
+
+ public function getSchemeProtocol(): string
+ {
+ return $this->schemeProtocol;
+ }
+
+ /**
+ * @return string[]
+ */
+ public function getSchemeExtensions(): array
+ {
+ return $this->schemeExtensions;
+ }
+
+ public function hasSchemeExtension(string $extension): bool
+ {
+ return in_array($extension, $this->schemeExtensions, true);
+ }
+
+ /**
+ * @return null|string
+ */
+ public function getUser(): ?string
+ {
+ return $this->user;
+ }
+
+ /**
+ * @return null|string
+ */
+ public function getPassword(): ?string
+ {
+ return $this->password;
+ }
+
+ /**
+ * @return null|string
+ */
+ public function getHost(): ?string
+ {
+ return $this->host;
+ }
+
+ /**
+ * @return int|null
+ */
+ public function getPort(): ?int
+ {
+ return $this->port;
+ }
+
+ /**
+ * @return null|string
+ */
+ public function getPath(): ?string
+ {
+ return $this->path;
+ }
+
+ /**
+ * @return null|string
+ */
+ public function getQueryString(): ?string
+ {
+ return $this->queryString;
+ }
+
+ /**
+ * @return array
+ */
+ public function getQuery(): array
+ {
+ return $this->query;
+ }
+
+ public function getQueryParameter(string $name, $default = null)
+ {
+ return array_key_exists($name, $this->query) ? $this->query[$name] : $default;
+ }
+
+ public function toArray()
+ {
+ return [
+ 'scheme' => $this->scheme,
+ 'schemeProtocol' => $this->schemeProtocol,
+ 'schemeExtensions' => $this->schemeExtensions,
+ 'user' => $this->user,
+ 'password' => $this->password,
+ 'host' => $this->host,
+ 'port' => $this->port,
+ 'path' => $this->path,
+ 'queryString' => $this->queryString,
+ 'query' => $this->query,
+ ];
+ }
+
+ private function parse(string $dsn): void
+ {
+ if (false === strpos($dsn, ':')) {
+ throw new \LogicException(sprintf('The DSN is invalid. It does not have scheme separator ":".'));
+ }
+
+ list($scheme, $dsnWithoutScheme) = explode(':', $dsn, 2);
+
+ $scheme = strtolower($scheme);
+ if (false == preg_match('/^[a-z\d+-.]*$/', $scheme)) {
+ throw new \LogicException('The DSN is invalid. Scheme contains illegal symbols.');
+ }
+
+ $schemeParts = explode('+', $scheme);
+ $this->scheme = $scheme;
+ $this->schemeProtocol = $schemeParts[0];
+
+ unset($schemeParts[0]);
+ $this->schemeExtensions = array_values($schemeParts);
+
+ if ($host = parse_url($dsn, PHP_URL_HOST)) {
+ $this->host = $host;
+ }
+
+ if ($port = parse_url($dsn, PHP_URL_PORT)) {
+ $this->port = (int) $port;
+ }
+
+ if ($user = parse_url($dsn, PHP_URL_USER)) {
+ $this->user = $user;
+ }
+
+ if ($password = parse_url($dsn, PHP_URL_PASS)) {
+ $this->password = $password;
+ }
+
+ if ($path = parse_url($dsn, PHP_URL_PATH)) {
+ $this->path = $path;
+ }
+
+ if ($queryString = parse_url($dsn, PHP_URL_QUERY)) {
+ $this->queryString = $queryString;
+
+ $query = [];
+ parse_str($queryString, $query);
+ $this->query = $query;
+ }
+ }
+}
diff --git a/pkg/dsn/LICENSE b/pkg/dsn/LICENSE
new file mode 100644
index 000000000..3d2964f26
--- /dev/null
+++ b/pkg/dsn/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+Copyright (c) 2018 Kotliar Maksym
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/pkg/dsn/README.md b/pkg/dsn/README.md
new file mode 100644
index 000000000..397beaeca
--- /dev/null
+++ b/pkg/dsn/README.md
@@ -0,0 +1,20 @@
+# Enqueue. Parse DSN class
+
+## Resources
+
+* [Site](https://enqueue.forma-pro.com/)
+* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)
+* [Questions](https://gitter.im/php-enqueue/Lobby)
+* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)
+
+## Developed by Forma-Pro
+
+Forma-Pro is a full stack development company which interests also spread to open source development.
+Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience.
+Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability.
+
+If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com
+
+## License
+
+It is released under the [MIT License](LICENSE).
diff --git a/pkg/dsn/Tests/DsnTest.php b/pkg/dsn/Tests/DsnTest.php
new file mode 100644
index 000000000..b7e43f711
--- /dev/null
+++ b/pkg/dsn/Tests/DsnTest.php
@@ -0,0 +1,102 @@
+expectException(\LogicException::class);
+ $this->expectExceptionMessage('The DSN is invalid. It does not have scheme separator ":".');
+ new Dsn('foobar');
+ }
+
+ public function testThrowsIfSchemeContainsIllegalSymbols()
+ {
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('The DSN is invalid. Scheme contains illegal symbols.');
+ new Dsn('foo_&%&^bar://localhost');
+ }
+
+ /**
+ * @dataProvider provideSchemes
+ */
+ public function testShouldParseSchemeCorrectly(string $dsn, string $expectedScheme, string $expectedSchemeProtocol, array $expectedSchemeExtensions)
+ {
+ $dsn = new Dsn($dsn);
+
+ $this->assertSame($expectedScheme, $dsn->getScheme());
+ $this->assertSame($expectedSchemeProtocol, $dsn->getSchemeProtocol());
+ $this->assertSame($expectedSchemeExtensions, $dsn->getSchemeExtensions());
+ }
+
+ public function testShouldParseUser()
+ {
+ $dsn = new Dsn('amqp+ext://theUser:thePass@theHost:1267/thePath');
+
+ $this->assertSame('theUser', $dsn->getUser());
+ }
+
+ public function testShouldParsePassword()
+ {
+ $dsn = new Dsn('amqp+ext://theUser:thePass@theHost:1267/thePath');
+
+ $this->assertSame('thePass', $dsn->getPassword());
+ }
+
+ public function testShouldParseHost()
+ {
+ $dsn = new Dsn('amqp+ext://theUser:thePass@theHost:1267/thePath');
+
+ $this->assertSame('theHost', $dsn->getHost());
+ }
+
+ public function testShouldParsePort()
+ {
+ $dsn = new Dsn('amqp+ext://theUser:thePass@theHost:1267/thePath');
+
+ $this->assertSame(1267, $dsn->getPort());
+ }
+
+ public function testShouldParsePath()
+ {
+ $dsn = new Dsn('amqp+ext://theUser:thePass@theHost:1267/thePath');
+
+ $this->assertSame('/thePath', $dsn->getPath());
+ }
+
+ public function testShouldParseQuery()
+ {
+ $dsn = new Dsn('amqp+ext://theUser:thePass@theHost:1267/thePath?foo=fooVal&bar=bar%2fVal');
+
+ $this->assertSame('foo=fooVal&bar=bar%2fVal', $dsn->getQueryString());
+ $this->assertSame(['foo' => 'fooVal', 'bar' => 'bar/Val'], $dsn->getQuery());
+ }
+
+ public static function provideSchemes()
+ {
+ yield [':', '', '', []];
+
+ yield ['FOO:', 'foo', 'foo', []];
+
+ yield ['foo:', 'foo', 'foo', []];
+
+ yield ['foo+bar:', 'foo+bar', 'foo', ['bar']];
+
+ yield ['foo+bar+baz:', 'foo+bar+baz', 'foo', ['bar', 'baz']];
+
+ yield ['foo:?bar=barVal', 'foo', 'foo', []];
+
+ yield ['amqp+ext://guest:guest@localhost:5672/%2f', 'amqp+ext', 'amqp', ['ext']];
+
+ yield ['amqp+ext+rabbitmq:', 'amqp+ext+rabbitmq', 'amqp', ['ext', 'rabbitmq']];
+ }
+}
diff --git a/pkg/dsn/composer.json b/pkg/dsn/composer.json
new file mode 100644
index 000000000..6c2f38189
--- /dev/null
+++ b/pkg/dsn/composer.json
@@ -0,0 +1,33 @@
+{
+ "name": "enqueue/dsn",
+ "type": "library",
+ "description": "Parse DSN",
+ "keywords": ["dsn", "parse"],
+ "homepage": "https://enqueue.forma-pro.com/",
+ "license": "MIT",
+ "require": {
+ "php": "^7.1.3"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "~5.4.0"
+ },
+ "support": {
+ "email": "opensource@forma-pro.com",
+ "issues": "https://github.com/php-enqueue/enqueue-dev/issues",
+ "forum": "https://gitter.im/php-enqueue/Lobby",
+ "source": "https://github.com/php-enqueue/enqueue-dev",
+ "docs": "https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md"
+ },
+ "autoload": {
+ "psr-4": { "Enqueue\\Dsn\\": "" },
+ "exclude-from-classmap": [
+ "/Tests/"
+ ]
+ },
+ "minimum-stability": "dev",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "0.9.x-dev"
+ }
+ }
+}
diff --git a/pkg/dsn/phpunit.xml.dist b/pkg/dsn/phpunit.xml.dist
new file mode 100644
index 000000000..21a14bbc3
--- /dev/null
+++ b/pkg/dsn/phpunit.xml.dist
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+ ./Tests
+
+
+
+
+
+ .
+
+ ./vendor
+ ./Tests
+
+
+
+
diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml
index d5bacbb03..6066a7ab3 100644
--- a/pkg/enqueue-bundle/Resources/config/client.yml
+++ b/pkg/enqueue-bundle/Resources/config/client.yml
@@ -1,4 +1,10 @@
services:
+ enqueue.client.driver_factory:
+ class: 'Enqueue\Client\DriverFactory'
+ arguments:
+ - '@enqueue.client.config'
+ - '@enqueue.client.meta.queue_meta_registry'
+
enqueue.client.config:
class: 'Enqueue\Client\Config'
public: false
diff --git a/pkg/enqueue-bundle/Resources/config/services.yml b/pkg/enqueue-bundle/Resources/config/services.yml
index 189218f04..6b626fd30 100644
--- a/pkg/enqueue-bundle/Resources/config/services.yml
+++ b/pkg/enqueue-bundle/Resources/config/services.yml
@@ -3,6 +3,9 @@ parameters:
enqueue.queue_consumer.default_receive_timeout: 10000
services:
+ enqueue.connection_factory_factory:
+ class: 'Enqueue\ConnectionFactoryFactory'
+
enqueue.consumption.extensions:
class: 'Enqueue\Consumption\ChainExtension'
public: false
diff --git a/pkg/enqueue/Client/DriverFactory.php b/pkg/enqueue/Client/DriverFactory.php
new file mode 100644
index 000000000..a82ec938f
--- /dev/null
+++ b/pkg/enqueue/Client/DriverFactory.php
@@ -0,0 +1,124 @@
+config = $config;
+ $this->queueMetaRegistry = $queueMetaRegistry;
+ }
+
+ public function create(PsrConnectionFactory $factory, string $dsn, array $config): DriverInterface
+ {
+ $dsn = new Dsn($dsn);
+
+ if ($driverClass = $this->findDriverClass($dsn, Resources::getAvailableDrivers())) {
+ if (RabbitMqDriver::class === $driverClass) {
+ if (false == $factory instanceof AmqpConnectionFactory) {
+ throw new \LogicException(sprintf(
+ 'The factory must be instance of "%s", got "%s"',
+ AmqpConnectionFactory::class,
+ get_class($factory)
+ ));
+ }
+
+ return new RabbitMqDriver($factory->createContext(), $this->config, $this->queueMetaRegistry);
+ }
+
+ if (RabbitMqStompDriver::class === $driverClass) {
+ if (false == $factory instanceof StompConnectionFactory) {
+ throw new \LogicException(sprintf(
+ 'The factory must be instance of "%s", got "%s"',
+ StompConnectionFactory::class,
+ get_class($factory)
+ ));
+ }
+
+ if (isset($config['rabbitmq_management_dsn'])) {
+ $managementDsn = new Dsn($config['rabbitmq_management_dsn']);
+
+ $managementClient = ManagementClient::create(
+ ltrim($managementDsn->getPath(), '/'),
+ $managementDsn->getHost(),
+ $managementDsn->getPort(),
+ $managementDsn->getUser(),
+ $managementDsn->getPassword()
+ );
+ } else {
+ $managementClient = ManagementClient::create(
+ ltrim($dsn->getPath(), '/'),
+ $dsn->getHost(),
+ isset($config['management_plugin_port']) ? $config['management_plugin_port'] : 15672,
+ $dsn->getUser(),
+ $dsn->getPassword()
+ );
+ }
+
+ return new RabbitMqStompDriver($factory->createContext(), $this->config, $this->queueMetaRegistry, $managementClient);
+ }
+
+ return new $driverClass($factory->createContext(), $this->config, $this->queueMetaRegistry);
+ }
+
+ $knownDrivers = Resources::getKnownDrivers();
+ if ($driverClass = $this->findDriverClass($dsn, $knownDrivers)) {
+ throw new \LogicException(sprintf(
+ 'To use given scheme "%s" a package has to be installed. Run "composer req %s" to add it.',
+ $dsn->getScheme(),
+ $knownDrivers[$driverClass]['package']
+ ));
+ }
+
+ throw new \LogicException(sprintf(
+ 'A given scheme "%s" is not supported. Maybe it is a custom driver, make sure you registered it with "%s::addDriver".',
+ $dsn->getScheme(),
+ Resources::class
+ ));
+ }
+
+ private function findDriverClass(Dsn $dsn, array $factories): ?string
+ {
+ $protocol = $dsn->getSchemeProtocol();
+ foreach ($factories as $driverClass => $info) {
+ if (false == in_array($protocol, $info['schemes'], true)) {
+ continue;
+ }
+
+ if (false == $dsn->getSchemeExtensions()) {
+ return $driverClass;
+ }
+
+ if (empty($info['requiredSchemeExtensions'])) {
+ continue;
+ }
+
+ $diff = array_diff($dsn->getSchemeExtensions(), $info['requiredSchemeExtensions']);
+ if (empty($diff)) {
+ return $driverClass;
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/pkg/enqueue/Client/DriverFactoryInterface.php b/pkg/enqueue/Client/DriverFactoryInterface.php
new file mode 100644
index 000000000..ffc96ee82
--- /dev/null
+++ b/pkg/enqueue/Client/DriverFactoryInterface.php
@@ -0,0 +1,10 @@
+ [
+ * schemes => [schemes strings],
+ * package => package name,
+ * ].
+ *
+ * @var array
+ */
+ private static $knownDrivers = null;
+
+ private function __construct()
+ {
+ }
+
+ public static function getAvailableDrivers(): array
+ {
+ $map = self::getKnownDrivers();
+
+ $availableMap = [];
+ foreach ($map as $driverClass => $item) {
+ if (class_exists($driverClass)) {
+ $availableMap[$driverClass] = $item;
+ }
+ }
+
+ return $availableMap;
+ }
+
+ public static function getKnownDrivers(): array
+ {
+ if (null === self::$knownDrivers) {
+ $map = [];
+
+ $map[AmqpDriver::class] = [
+ 'schemes' => ['amqp'],
+ 'requiredSchemeExtensions' => [],
+ 'package' => 'enqueue/enqueue',
+ ];
+ $map[RabbitMqDriver::class] = [
+ 'schemes' => ['amqp'],
+ 'requiredSchemeExtensions' => ['rabbitmq'],
+ 'package' => 'enqueue/enqueue',
+ ];
+ $map[FsDriver::class] = [
+ 'schemes' => ['file'],
+ 'requiredSchemeExtensions' => [],
+ 'package' => 'enqueue/fs',
+ ];
+ $map[NullDriver::class] = [
+ 'schemes' => ['null'],
+ 'requiredSchemeExtensions' => [],
+ 'package' => 'enqueue/null',
+ ];
+ $map[GpsDriver::class] = [
+ 'schemes' => ['gps'],
+ 'requiredSchemeExtensions' => [],
+ 'package' => 'enqueue/gps',
+ ];
+ $map[RedisDriver::class] = [
+ 'schemes' => ['redis'],
+ 'requiredSchemeExtensions' => [],
+ 'package' => 'enqueue/redis',
+ ];
+ $map[SqsDriver::class] = [
+ 'schemes' => ['sqs'],
+ 'requiredSchemeExtensions' => [],
+ 'package' => 'enqueue/sqs',
+ ];
+ $map[StompDriver::class] = [
+ 'schemes' => ['stomp'],
+ 'requiredSchemeExtensions' => [],
+ 'package' => 'enqueue/stomp',
+ ];
+ $map[RabbitMqStompDriver::class] = [
+ 'schemes' => ['stomp'],
+ 'requiredSchemeExtensions' => ['rabbitmq'],
+ 'package' => 'enqueue/stomp',
+ ];
+ $map[RdKafkaDriver::class] = [
+ 'schemes' => ['kafka', 'rdkafka'],
+ 'requiredSchemeExtensions' => [],
+ 'package' => 'enqueue/rdkafka',
+ ];
+ $map[MongodbDriver::class] = [
+ 'schemes' => ['mongodb'],
+ 'requiredSchemeExtensions' => [],
+ 'package' => 'enqueue/mongodb',
+ ];
+ $map[DbalDriver::class] = [
+ 'schemes' => [
+ 'db2',
+ 'ibm-db2',
+ 'mssql',
+ 'sqlsrv',
+ 'mysql',
+ 'mysql2',
+ 'mysql',
+ 'pgsql',
+ 'postgres',
+ 'pgsql',
+ 'sqlite',
+ 'sqlite3',
+ 'sqlite',
+ ],
+ 'requiredSchemeExtensions' => ['pdo'],
+ 'package' => 'enqueue/dbal',
+ ];
+
+ self::$knownDrivers = $map;
+ }
+
+ return self::$knownDrivers;
+ }
+
+ public static function addDriver(string $driverClass, array $schemes, array $requiredExtensions, string $package): void
+ {
+ if (class_exists($driverClass)) {
+ if (false == (new \ReflectionClass($driverClass))->implementsInterface(DriverInterface::class)) {
+ throw new \InvalidArgumentException(sprintf('The driver class "%s" must implement "%s" interface.', $driverClass, DriverInterface::class));
+ }
+ }
+
+ if (empty($schemes)) {
+ throw new \InvalidArgumentException('Schemes could not be empty.');
+ }
+ if (empty($package)) {
+ throw new \InvalidArgumentException('Package name could not be empty.');
+ }
+
+ self::getKnownDrivers();
+ self::$knownDrivers[$driverClass] = [
+ 'schemes' => $schemes,
+ 'requiredSchemeExtensions' => $requiredExtensions,
+ 'package' => $package,
+ ];
+ }
+}
diff --git a/pkg/enqueue/ConnectionFactoryFactory.php b/pkg/enqueue/ConnectionFactoryFactory.php
new file mode 100644
index 000000000..9069f430e
--- /dev/null
+++ b/pkg/enqueue/ConnectionFactoryFactory.php
@@ -0,0 +1,58 @@
+findFactoryClass($dsn, Resources::getAvailableConnections())) {
+ return new $factoryClass((string) $dsn);
+ }
+
+ $knownConnections = Resources::getKnownConnections();
+ if ($factoryClass = $this->findFactoryClass($dsn, $knownConnections)) {
+ throw new \LogicException(sprintf(
+ 'To use given scheme "%s" a package has to be installed. Run "composer req %s" to add it.',
+ $dsn->getScheme(),
+ $knownConnections[$factoryClass]['package']
+ ));
+ }
+
+ throw new \LogicException(sprintf(
+ 'A given scheme "%s" is not supported. Maybe it is a custom connection, make sure you registered it with "%s::addConnection".',
+ $dsn->getScheme(),
+ Resources::class
+ ));
+ }
+
+ private function findFactoryClass(Dsn $dsn, array $factories): ?string
+ {
+ $protocol = $dsn->getSchemeProtocol();
+ foreach ($factories as $connectionClass => $info) {
+ if (false == in_array($protocol, $info['schemes'], true)) {
+ continue;
+ }
+
+ if (false == $dsn->getSchemeExtensions()) {
+ return $connectionClass;
+ }
+
+ if (empty($info['supportedSchemeExtensions'])) {
+ continue;
+ }
+
+ $diff = array_diff($dsn->getSchemeExtensions(), $info['supportedSchemeExtensions']);
+ if (empty($diff)) {
+ return $connectionClass;
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/pkg/enqueue/ConnectionFactoryFactoryInterface.php b/pkg/enqueue/ConnectionFactoryFactoryInterface.php
new file mode 100644
index 000000000..4c74a5e57
--- /dev/null
+++ b/pkg/enqueue/ConnectionFactoryFactoryInterface.php
@@ -0,0 +1,10 @@
+ [
+ * schemes => [schemes strings],
+ * package => package name,
+ * ].
+ *
+ * @var array
+ */
+ private static $knownConnections = null;
+
+ private function __construct()
+ {
+ }
+
+ public static function getAvailableConnections(): array
+ {
+ $map = self::getKnownConnections();
+
+ $availableMap = [];
+ foreach ($map as $connectionClass => $item) {
+ if (class_exists($connectionClass)) {
+ $availableMap[$connectionClass] = $item;
+ }
+ }
+
+ return $availableMap;
+ }
+
+ public static function getKnownSchemes(): array
+ {
+ $map = self::getKnownConnections();
+
+ $schemes = [];
+ foreach ($map as $connectionClass => $item) {
+ foreach ($item['schemes'] as $scheme) {
+ $schemes[$scheme] = $connectionClass;
+ }
+ }
+
+ return $schemes;
+ }
+
+ public static function getAvailableSchemes(): array
+ {
+ $map = self::getAvailableConnections();
+
+ $schemes = [];
+ foreach ($map as $connectionClass => $item) {
+ foreach ($item['schemes'] as $scheme) {
+ $schemes[$scheme] = $connectionClass;
+ }
+ }
+
+ return $schemes;
+ }
+
+ public static function getKnownConnections(): array
+ {
+ if (null === self::$knownConnections) {
+ $map = [];
+
+ $map[FsConnectionFactory::class] = [
+ 'schemes' => ['file'],
+ 'supportedSchemeExtensions' => [],
+ 'package' => 'enqueue/fs',
+ ];
+ $map[AmqpExtConnectionFactory::class] = [
+ 'schemes' => ['amqp', 'amqps'],
+ 'supportedSchemeExtensions' => ['ext', 'rabbitmq'],
+ 'package' => 'enqueue/amqp-ext',
+ ];
+ $map[AmqpBunnyConnectionFactory::class] = [
+ 'schemes' => ['amqp'],
+ 'supportedSchemeExtensions' => ['bunny', 'rabbitmq'],
+ 'package' => 'enqueue/amqp-bunny',
+ ];
+ $map[AmqpLibConnectionFactory::class] = [
+ 'schemes' => ['amqp', 'amqps'],
+ 'supportedSchemeExtensions' => ['lib', 'rabbitmq'],
+ 'package' => 'enqueue/amqp-lib',
+ ];
+
+ $map[DbalConnectionFactory::class] = [
+ 'schemes' => [
+ 'db2',
+ 'ibm-db2',
+ 'mssql',
+ 'sqlsrv',
+ 'mysql',
+ 'mysql2',
+ 'mysql',
+ 'pgsql',
+ 'postgres',
+ 'sqlite',
+ 'sqlite3',
+ 'sqlite',
+ ],
+ 'supportedSchemeExtensions' => ['pdo'],
+ 'package' => 'enqueue/dbal',
+ ];
+
+ $map[NullConnectionFactory::class] = [
+ 'schemes' => ['null'],
+ 'supportedSchemeExtensions' => [],
+ 'package' => 'enqueue/null',
+ ];
+ $map[GearmanConnectionFactory::class] = [
+ 'schemes' => ['gearman'],
+ 'supportedSchemeExtensions' => [],
+ 'package' => 'enqueue/gearman',
+ ];
+ $map[PheanstalkConnectionFactory::class] = [
+ 'schemes' => ['beanstalk'],
+ 'supportedSchemeExtensions' => ['pheanstalk'],
+ 'package' => 'enqueue/pheanstalk',
+ ];
+ $map[RdKafkaConnectionFactory::class] = [
+ 'schemes' => ['kafka', 'rdkafka'],
+ 'supportedSchemeExtensions' => ['rdkafka'],
+ 'package' => 'enqueue/rdkafka',
+ ];
+ $map[RedisConnectionFactory::class] = [
+ 'schemes' => ['redis'],
+ 'supportedSchemeExtensions' => ['predis', 'phpredis'],
+ 'package' => 'enqueue/redis',
+ ];
+ $map[StompConnectionFactory::class] = [
+ 'schemes' => ['stomp'],
+ 'supportedSchemeExtensions' => ['rabbitmq'],
+ 'package' => 'enqueue/stomp', ];
+ $map[SqsConnectionFactory::class] = [
+ 'schemes' => ['sqs'],
+ 'supportedSchemeExtensions' => [],
+ 'package' => 'enqueue/sqs', ];
+ $map[GpsConnectionFactory::class] = [
+ 'schemes' => ['gps'],
+ 'supportedSchemeExtensions' => [],
+ 'package' => 'enqueue/gps', ];
+ $map[MongodbConnectionFactory::class] = [
+ 'schemes' => ['mongodb'],
+ 'supportedSchemeExtensions' => [],
+ 'package' => 'enqueue/mongodb',
+ ];
+
+ self::$knownConnections = $map;
+ }
+
+ return self::$knownConnections;
+ }
+
+ public static function addConnection(string $connectionFactoryClass, array $schemes, array $extensions, string $package): void
+ {
+ if (class_exists($connectionFactoryClass)) {
+ if (false == (new \ReflectionClass($connectionFactoryClass))->implementsInterface(PsrConnectionFactory::class)) {
+ throw new \InvalidArgumentException(sprintf('The connection factory class "%s" must implement "%s" interface.', $connectionFactoryClass, PsrConnectionFactory::class));
+ }
+ }
+
+ if (empty($schemes)) {
+ throw new \InvalidArgumentException('Schemes could not be empty.');
+ }
+ if (empty($package)) {
+ throw new \InvalidArgumentException('Package name could not be empty.');
+ }
+
+ self::getKnownConnections();
+ self::$knownConnections[$connectionFactoryClass] = [
+ 'schemes' => $schemes,
+ 'supportedSchemeExtensions' => $extensions,
+ 'package' => $package,
+ ];
+ }
+}
diff --git a/pkg/enqueue/Symfony/AmqpTransportFactory.php b/pkg/enqueue/Symfony/AmqpTransportFactory.php
index e69bf44de..cf6f34310 100644
--- a/pkg/enqueue/Symfony/AmqpTransportFactory.php
+++ b/pkg/enqueue/Symfony/AmqpTransportFactory.php
@@ -6,13 +6,13 @@
use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory;
use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory;
use Enqueue\Client\Amqp\AmqpDriver;
+use Enqueue\ConnectionFactoryFactory;
use Interop\Amqp\AmqpConnectionFactory;
use Interop\Amqp\AmqpContext;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;
-use function Enqueue\dsn_to_connection_factory;
class AmqpTransportFactory implements TransportFactoryInterface, DriverFactoryInterface
{
@@ -221,7 +221,7 @@ public static function createConnectionFactoryFactory(array $config)
}
$dsn = array_key_exists('dsn', $config) ? $config['dsn'] : 'amqp:';
- $factory = dsn_to_connection_factory($dsn);
+ $factory = (new ConnectionFactoryFactory())->create($dsn);
if (false == $factory instanceof AmqpConnectionFactory) {
throw new \LogicException(sprintf('Factory must be instance of "%s" but got "%s"', AmqpConnectionFactory::class, get_class($factory)));
diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php
index fa45d2544..a65f2bf13 100644
--- a/pkg/enqueue/Symfony/DefaultTransportFactory.php
+++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php
@@ -2,29 +2,13 @@
namespace Enqueue\Symfony;
-use Enqueue\Dbal\DbalConnectionFactory;
-use Enqueue\Dbal\Symfony\DbalTransportFactory;
-use Enqueue\Fs\FsConnectionFactory;
-use Enqueue\Fs\Symfony\FsTransportFactory;
-use Enqueue\Gps\GpsConnectionFactory;
-use Enqueue\Gps\Symfony\GpsTransportFactory;
-use Enqueue\Mongodb\MongodbConnectionFactory;
-use Enqueue\Mongodb\Symfony\MongodbTransportFactory;
-use Enqueue\Null\NullConnectionFactory;
-use Enqueue\Null\Symfony\NullTransportFactory;
-use Enqueue\RdKafka\RdKafkaConnectionFactory;
-use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
-use Enqueue\Redis\RedisConnectionFactory;
-use Enqueue\Redis\Symfony\RedisTransportFactory;
-use Enqueue\Sqs\SqsConnectionFactory;
-use Enqueue\Sqs\Symfony\SqsTransportFactory;
-use Enqueue\Stomp\StompConnectionFactory;
-use Enqueue\Stomp\Symfony\StompTransportFactory;
-use Interop\Amqp\AmqpConnectionFactory;
+use Enqueue\Client\DriverInterface;
+use Interop\Queue\PsrConnectionFactory;
+use Interop\Queue\PsrContext;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\DependencyInjection\Alias;
use Symfony\Component\DependencyInjection\ContainerBuilder;
-use function Enqueue\dsn_to_connection_factory;
+use Symfony\Component\DependencyInjection\Reference;
class DefaultTransportFactory implements TransportFactoryInterface, DriverFactoryInterface
{
@@ -64,8 +48,11 @@ public function addConfiguration(ArrayNodeDefinition $builder)
if (is_string($v)) {
return false !== strpos($v, ':') || false !== strpos($v, 'env_') ?
['dsn' => $v] :
- ['alias' => $v];
+ ['alias' => $v]
+ ;
}
+
+ throw new \LogicException(sprintf('The value must be array, null or string. Got "%s"', gettype($v)));
})
->end()
->children()
@@ -78,17 +65,17 @@ public function addConfiguration(ArrayNodeDefinition $builder)
public function createConnectionFactory(ContainerBuilder $container, array $config)
{
+ $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
if (isset($config['alias'])) {
$aliasId = sprintf('enqueue.transport.%s.connection_factory', $config['alias']);
+ $container->setAlias($factoryId, new Alias($aliasId, true));
} else {
- $dsn = $this->resolveDSN($container, $config['dsn']);
-
- $aliasId = $this->findFactory($dsn)->createConnectionFactory($container, $config);
+ $container->register($factoryId, PsrConnectionFactory::class)
+ ->setFactory([new Reference('enqueue.connection_factory_factory'), 'create'])
+ ->addArgument($config['dsn'])
+ ;
}
- $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
-
- $container->setAlias($factoryId, new Alias($aliasId, true));
$container->setAlias('enqueue.transport.connection_factory', new Alias($factoryId, true));
return $factoryId;
@@ -99,17 +86,18 @@ public function createConnectionFactory(ContainerBuilder $container, array $conf
*/
public function createContext(ContainerBuilder $container, array $config)
{
+ $contextId = sprintf('enqueue.transport.%s.context', $this->getName());
+ $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
+
if (isset($config['alias'])) {
$aliasId = sprintf('enqueue.transport.%s.context', $config['alias']);
+ $container->setAlias($contextId, new Alias($aliasId, true));
} else {
- $dsn = $this->resolveDSN($container, $config['dsn']);
-
- $aliasId = $this->findFactory($dsn)->createContext($container, $config);
+ $container->register($contextId, PsrContext::class)
+ ->setFactory([new Reference($factoryId), 'createContext'])
+ ;
}
- $contextId = sprintf('enqueue.transport.%s.context', $this->getName());
-
- $container->setAlias($contextId, new Alias($aliasId, true));
$container->setAlias('enqueue.transport.context', new Alias($contextId, true));
return $contextId;
@@ -120,17 +108,21 @@ public function createContext(ContainerBuilder $container, array $config)
*/
public function createDriver(ContainerBuilder $container, array $config)
{
+ $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
+ $driverId = sprintf('enqueue.client.%s.driver', $this->getName());
+
if (isset($config['alias'])) {
$aliasId = sprintf('enqueue.client.%s.driver', $config['alias']);
+ $container->setAlias($driverId, new Alias($aliasId, true));
} else {
- $dsn = $this->resolveDSN($container, $config['dsn']);
-
- $aliasId = $this->findFactory($dsn)->createDriver($container, $config);
+ $container->register($driverId, DriverInterface::class)
+ ->setFactory([new Reference('enqueue.client.driver_factory'), 'create'])
+ ->addArgument(new Reference($factoryId))
+ ->addArgument($config['dsn'])
+ ->addArgument($config)
+ ;
}
- $driverId = sprintf('enqueue.client.%s.driver', $this->getName());
-
- $container->setAlias($driverId, new Alias($aliasId, true));
$container->setAlias('enqueue.client.driver', new Alias($driverId, true));
return $driverId;
@@ -143,88 +135,4 @@ public function getName()
{
return $this->name;
}
-
- /**
- * This is a quick fix to the exception "Incompatible use of dynamic environment variables "ENQUEUE_DSN" found in parameters."
- * TODO: We'll have to come up with a better solution.
- *
- * @param ContainerBuilder $container
- * @param $dsn
- *
- * @return array|false|string
- */
- private function resolveDSN(ContainerBuilder $container, $dsn)
- {
- if (method_exists($container, 'resolveEnvPlaceholders')) {
- $dsn = $container->resolveEnvPlaceholders($dsn);
-
- $matches = [];
- if (preg_match('/%env\((.*?)\)/', $dsn, $matches)) {
- if (false === $realDsn = getenv($matches[1])) {
- throw new \LogicException(sprintf('The env "%s" var is not defined', $matches[1]));
- }
-
- return $realDsn;
- }
- }
-
- return $dsn;
- }
-
- /**
- * @param string
- * @param mixed $dsn
- *
- * @return TransportFactoryInterface
- */
- private function findFactory($dsn)
- {
- $factory = dsn_to_connection_factory($dsn);
-
- if ($factory instanceof AmqpConnectionFactory) {
- return new AmqpTransportFactory('default_amqp');
- }
-
- if ($factory instanceof FsConnectionFactory) {
- return new FsTransportFactory('default_fs');
- }
-
- if ($factory instanceof DbalConnectionFactory) {
- return new DbalTransportFactory('default_dbal');
- }
-
- if ($factory instanceof NullConnectionFactory) {
- return new NullTransportFactory('default_null');
- }
-
- if ($factory instanceof GpsConnectionFactory) {
- return new GpsTransportFactory('default_gps');
- }
-
- if ($factory instanceof RedisConnectionFactory) {
- return new RedisTransportFactory('default_redis');
- }
-
- if ($factory instanceof SqsConnectionFactory) {
- return new SqsTransportFactory('default_sqs');
- }
-
- if ($factory instanceof StompConnectionFactory) {
- return new StompTransportFactory('default_stomp');
- }
-
- if ($factory instanceof RdKafkaConnectionFactory) {
- return new RdKafkaTransportFactory('default_kafka');
- }
-
- if ($factory instanceof MongodbConnectionFactory) {
- return new MongodbTransportFactory('default_mongodb');
- }
-
- throw new \LogicException(sprintf(
- 'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
- get_class($factory),
- $dsn
- ));
- }
}
diff --git a/pkg/enqueue/Tests/Client/DriverFactoryTest.php b/pkg/enqueue/Tests/Client/DriverFactoryTest.php
new file mode 100644
index 000000000..336158c6a
--- /dev/null
+++ b/pkg/enqueue/Tests/Client/DriverFactoryTest.php
@@ -0,0 +1,179 @@
+assertTrue($rc->implementsInterface(DriverFactoryInterface::class));
+ }
+
+ public function testShouldBeFinal()
+ {
+ $rc = new \ReflectionClass(DriverFactory::class);
+
+ $this->assertTrue($rc->isFinal());
+ }
+
+ public function testCouldBeConstructedWithConfigAndQueueMetaAsArguments()
+ {
+ new DriverFactory($this->createConfigMock(), $this->createQueueMetaRegistryMock());
+ }
+
+ public function testThrowIfPackageThatSupportSchemeNotInstalled()
+ {
+ $scheme = 'scheme5b7aa7d7cd213';
+ $class = 'ConnectionClass5b7aa7d7cd213';
+
+ Resources::addDriver($class, [$scheme], [], 'thePackage');
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('To use given scheme "scheme5b7aa7d7cd213" a package has to be installed. Run "composer req thePackage" to add it.');
+ $factory = new DriverFactory($this->createConfigMock(), $this->createQueueMetaRegistryMock());
+
+ $factory->create($this->createConnectionFactoryMock(), $scheme.'://foo', []);
+ }
+
+ public function testThrowIfSchemeIsNotKnown()
+ {
+ $scheme = 'scheme5b7aa862e70a5';
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('A given scheme "scheme5b7aa862e70a5" is not supported. Maybe it is a custom driver, make sure you registered it with "Enqueue\Client\Resources::addDriver".');
+
+ $factory = new DriverFactory($this->createConfigMock(), $this->createQueueMetaRegistryMock());
+
+ $factory->create($this->createConnectionFactoryMock(), $scheme.'://foo', []);
+ }
+
+ public function testThrowIfDsnInvalid()
+ {
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('The DSN is invalid. It does not have scheme separator ":".');
+
+ $factory = new DriverFactory($this->createConfigMock(), $this->createQueueMetaRegistryMock());
+
+ $factory->create($this->createConnectionFactoryMock(), 'invalidDsn', []);
+ }
+
+ /**
+ * @dataProvider provideDSN
+ */
+ public function testReturnsExpectedFactories(
+ string $dsn,
+ string $connectionFactoryClass,
+ string $contextClass,
+ array $conifg,
+ string $expectedDriverClass
+ ) {
+ $connectionFactoryMock = $this->createMock($connectionFactoryClass);
+ $connectionFactoryMock
+ ->expects($this->once())
+ ->method('createContext')
+ ->willReturn($this->createMock($contextClass))
+ ;
+
+ $driverFactory = new DriverFactory($this->createConfigMock(), $this->createQueueMetaRegistryMock());
+
+ $driver = $driverFactory->create($connectionFactoryMock, $dsn, $conifg);
+
+ $this->assertInstanceOf($expectedDriverClass, $driver);
+ }
+
+ public static function provideDSN()
+ {
+ yield ['null:', NullConnectionFactory::class, NullContext::class, [], NullDriver::class];
+
+ yield ['amqp:', AmqpConnectionFactory::class, AmqpContext::class, [], AmqpDriver::class];
+
+ yield ['amqp+rabbitmq:', AmqpConnectionFactory::class, AmqpContext::class, [], RabbitMqDriver::class];
+
+ yield ['mysql:', DbalConnectionFactory::class, DbalContext::class, [], DbalDriver::class];
+
+ yield ['file:', FsConnectionFactory::class, FsContext::class, [], FsDriver::class];
+
+ // https://github.com/php-enqueue/enqueue-dev/issues/511
+// yield ['gearman:', GearmanConnectionFactory::class, NullContext::class, [], NullDriver::class];
+
+ yield ['gps:', GpsConnectionFactory::class, GpsContext::class, [], GpsDriver::class];
+
+ yield ['mongodb:', MongodbConnectionFactory::class, MongodbContext::class, [], MongodbDriver::class];
+
+ yield ['kafka:', RdKafkaConnectionFactory::class, RdKafkaContext::class, [], RdKafkaDriver::class];
+
+ yield ['redis:', RedisConnectionFactory::class, RedisContext::class, [], RedisDriver::class];
+
+ yield ['sqs:', SqsConnectionFactory::class, SqsContext::class, [], SqsDriver::class];
+
+ yield ['stomp:', StompConnectionFactory::class, StompContext::class, [], StompDriver::class];
+
+ yield ['stomp+rabbitmq:', StompConnectionFactory::class, StompContext::class, [], RabbitMqStompDriver::class];
+
+ yield ['stomp+rabbitmq:', StompConnectionFactory::class, StompContext::class, [
+ 'rabbitmq_management_dsn' => 'http://guest:guest@localhost:15672/mqdev',
+ ], RabbitMqStompDriver::class];
+
+ yield ['stomp+rabbitmq:', StompConnectionFactory::class, StompContext::class, [
+ 'management_plugin_port' => 1234,
+ ], RabbitMqStompDriver::class];
+ }
+
+ private function createConnectionFactoryMock(): PsrConnectionFactory
+ {
+ return $this->createMock(PsrConnectionFactory::class);
+ }
+
+ private function createConfigMock(): Config
+ {
+ return $this->createMock(Config::class);
+ }
+
+ private function createQueueMetaRegistryMock()
+ {
+ return $this->createMock(QueueMetaRegistry::class);
+ }
+}
diff --git a/pkg/enqueue/Tests/Client/ResourcesTest.php b/pkg/enqueue/Tests/Client/ResourcesTest.php
new file mode 100644
index 000000000..55ff951da
--- /dev/null
+++ b/pkg/enqueue/Tests/Client/ResourcesTest.php
@@ -0,0 +1,146 @@
+assertTrue($rc->isFinal());
+ }
+
+ public function testShouldConstructorBePrivate()
+ {
+ $rc = new \ReflectionClass(Resources::class);
+
+ $this->assertTrue($rc->getConstructor()->isPrivate());
+ }
+
+ public function testShouldGetAvailableDriverInExpectedFormat()
+ {
+ $availableDrivers = Resources::getAvailableDrivers();
+
+ $this->assertInternalType('array', $availableDrivers);
+ $this->assertArrayHasKey(RedisDriver::class, $availableDrivers);
+
+ $driverInfo = $availableDrivers[RedisDriver::class];
+ $this->assertArrayHasKey('schemes', $driverInfo);
+ $this->assertSame(['redis'], $driverInfo['schemes']);
+
+ $this->assertArrayHasKey('requiredSchemeExtensions', $driverInfo);
+ $this->assertSame([], $driverInfo['requiredSchemeExtensions']);
+
+ $this->assertArrayHasKey('package', $driverInfo);
+ $this->assertSame('enqueue/redis', $driverInfo['package']);
+ }
+
+ public function testShouldGetAvailableDriverWithRequiredExtensionInExpectedFormat()
+ {
+ $availableDrivers = Resources::getAvailableDrivers();
+
+ $this->assertInternalType('array', $availableDrivers);
+ $this->assertArrayHasKey(RabbitMqDriver::class, $availableDrivers);
+
+ $driverInfo = $availableDrivers[RabbitMqDriver::class];
+ $this->assertArrayHasKey('schemes', $driverInfo);
+ $this->assertSame(['amqp'], $driverInfo['schemes']);
+
+ $this->assertArrayHasKey('requiredSchemeExtensions', $driverInfo);
+ $this->assertSame(['rabbitmq'], $driverInfo['requiredSchemeExtensions']);
+
+ $this->assertArrayHasKey('package', $driverInfo);
+ $this->assertSame('enqueue/enqueue', $driverInfo['package']);
+ }
+
+ public function testShouldGetKnownDriversInExpectedFormat()
+ {
+ $knownDrivers = Resources::getKnownDrivers();
+
+ $this->assertInternalType('array', $knownDrivers);
+ $this->assertArrayHasKey(RedisDriver::class, $knownDrivers);
+
+ $driverInfo = $knownDrivers[RedisDriver::class];
+ $this->assertArrayHasKey('schemes', $driverInfo);
+ $this->assertSame(['redis'], $driverInfo['schemes']);
+
+ $this->assertArrayHasKey('requiredSchemeExtensions', $driverInfo);
+ $this->assertSame([], $driverInfo['requiredSchemeExtensions']);
+ }
+
+ public function testThrowsIfDriverClassNotImplementDriverFactoryInterfaceOnAddDriver()
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('The driver class "stdClass" must implement "Enqueue\Client\DriverInterface" interface.');
+
+ Resources::addDriver(\stdClass::class, [], [], 'foo');
+ }
+
+ public function testThrowsIfNoSchemesProvidedOnAddDriver()
+ {
+ $driverClass = $this->getMockClass(DriverInterface::class);
+
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Schemes could not be empty.');
+
+ Resources::addDriver($driverClass, [], [], 'foo');
+ }
+
+ public function testThrowsIfNoPackageProvidedOnAddDriver()
+ {
+ $driverClass = $this->getMockClass(DriverInterface::class);
+
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Package name could not be empty.');
+
+ Resources::addDriver($driverClass, ['foo'], [], '');
+ }
+
+ public function testShouldAllowRegisterDriverThatIsNotInstalled()
+ {
+ Resources::addDriver('theDriverClass', ['foo'], ['barExtension'], 'foo');
+
+ $knownDrivers = Resources::getKnownDrivers();
+ $this->assertInternalType('array', $knownDrivers);
+ $this->assertArrayHasKey('theDriverClass', $knownDrivers);
+
+ $availableDrivers = Resources::getAvailableDrivers();
+
+ $this->assertInternalType('array', $availableDrivers);
+ $this->assertArrayNotHasKey('theDriverClass', $availableDrivers);
+ }
+
+ public function testShouldAllowGetPreviouslyRegisteredDriver()
+ {
+ $driverClass = $this->getMockClass(DriverInterface::class);
+
+ Resources::addDriver(
+ $driverClass,
+ ['fooscheme', 'barscheme'],
+ ['fooextension', 'barextension'],
+ 'foo/bar'
+ );
+
+ $availableDrivers = Resources::getAvailableDrivers();
+
+ $this->assertInternalType('array', $availableDrivers);
+ $this->assertArrayHasKey($driverClass, $availableDrivers);
+
+ $driverInfo = $availableDrivers[$driverClass];
+ $this->assertArrayHasKey('schemes', $driverInfo);
+ $this->assertSame(['fooscheme', 'barscheme'], $driverInfo['schemes']);
+
+ $this->assertArrayHasKey('requiredSchemeExtensions', $driverInfo);
+ $this->assertSame(['fooextension', 'barextension'], $driverInfo['requiredSchemeExtensions']);
+
+ $this->assertArrayHasKey('package', $driverInfo);
+ $this->assertSame('foo/bar', $driverInfo['package']);
+ }
+}
diff --git a/pkg/enqueue/Tests/ConnectionFactoryFactoryTest.php b/pkg/enqueue/Tests/ConnectionFactoryFactoryTest.php
new file mode 100644
index 000000000..bdfdb045e
--- /dev/null
+++ b/pkg/enqueue/Tests/ConnectionFactoryFactoryTest.php
@@ -0,0 +1,133 @@
+assertTrue($rc->implementsInterface(ConnectionFactoryFactoryInterface::class));
+ }
+
+ public function testShouldBeFinal()
+ {
+ $rc = new \ReflectionClass(ConnectionFactoryFactory::class);
+
+ $this->assertTrue($rc->isFinal());
+ }
+
+ public function testCouldBeConstructedWithoutAnyArguments()
+ {
+ new ConnectionFactoryFactory();
+ }
+
+ public function testThrowIfPackageThatSupportSchemeNotInstalled()
+ {
+ $scheme = 'scheme5b7aa7d7cd213';
+ $class = 'ConnectionClass5b7aa7d7cd213';
+
+ Resources::addConnection($class, [$scheme], [], 'thePackage');
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('To use given scheme "scheme5b7aa7d7cd213" a package has to be installed. Run "composer req thePackage" to add it.');
+ (new ConnectionFactoryFactory())->create($scheme.'://foo');
+ }
+
+ public function testThrowIfSchemeIsNotKnown()
+ {
+ $scheme = 'scheme5b7aa862e70a5';
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('A given scheme "scheme5b7aa862e70a5" is not supported. Maybe it is a custom connection, make sure you registered it with "Enqueue\Resources::addConnection".');
+ (new ConnectionFactoryFactory())->create($scheme.'://foo');
+ }
+
+ public function testThrowIfDsnInvalid()
+ {
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('The DSN is invalid. It does not have scheme separator ":".');
+
+ (new ConnectionFactoryFactory())->create('invalid-scheme');
+ }
+
+ /**
+ * @dataProvider provideDSN
+ */
+ public function testReturnsExpectedFactories(string $dsn, string $expectedFactoryClass)
+ {
+ $connectionFactory = (new ConnectionFactoryFactory())->create($dsn);
+
+ $this->assertInstanceOf($expectedFactoryClass, $connectionFactory);
+ }
+
+ public static function provideDSN()
+ {
+ yield ['null:', NullConnectionFactory::class];
+
+ yield ['amqp:', AmqpExtConnectionFactory::class];
+
+ yield ['amqp+ext:', AmqpExtConnectionFactory::class];
+
+// yield ['amqp+rabbitmq:', AmqpExtConnectionFactory::class];
+
+// yield ['amqp+rabbitmq+ext:', AmqpExtConnectionFactory::class];
+
+ yield ['amqps:', AmqpExtConnectionFactory::class];
+
+ yield ['amqps+ext:', AmqpExtConnectionFactory::class];
+
+// yield ['amqps+rabbitmq:', AmqpExtConnectionFactory::class];
+
+// yield ['amqps+ext+rabbitmq:', AmqpExtConnectionFactory::class];
+
+ yield ['amqp+bunny:', AmqpBunnyConnectionFactory::class];
+
+ yield ['amqp+lib:', AmqpLibConnectionFactory::class];
+
+ yield ['mssql:', DbalConnectionFactory::class];
+
+ yield ['mysql:', DbalConnectionFactory::class];
+
+ yield ['pgsql:', DbalConnectionFactory::class];
+
+ yield ['file:', FsConnectionFactory::class];
+
+ // https://github.com/php-enqueue/enqueue-dev/issues/511
+// yield ['gearman:', GearmanConnectionFactory::class];
+
+ yield ['gps:', GpsConnectionFactory::class];
+
+ yield ['mongodb:', MongodbConnectionFactory::class];
+
+ yield ['beanstalk:', PheanstalkConnectionFactory::class];
+
+ yield ['kafka:', RdKafkaConnectionFactory::class];
+
+ yield ['redis:', RedisConnectionFactory::class];
+
+ yield ['sqs:', SqsConnectionFactory::class];
+
+ yield ['stomp:', StompConnectionFactory::class];
+ }
+}
diff --git a/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php b/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php
deleted file mode 100644
index 2c65eb6d9..000000000
--- a/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php
+++ /dev/null
@@ -1,104 +0,0 @@
-expectException(\LogicException::class);
- $this->expectExceptionMessage('The scheme could not be parsed from DSN ""');
-
- \Enqueue\dsn_to_connection_factory('');
- }
-
- public function testThrowIfDsnMissingScheme()
- {
- $this->expectException(\LogicException::class);
- $this->expectExceptionMessage('The scheme could not be parsed from DSN "dsnMissingScheme"');
-
- \Enqueue\dsn_to_connection_factory('dsnMissingScheme');
- }
-
- public function testThrowIfDsnNotSupported()
- {
- $this->expectException(\LogicException::class);
- $this->expectExceptionMessage('The scheme "http" is not supported. Supported "file", "amqp+ext"');
-
- \Enqueue\dsn_to_connection_factory('http://schemeNotSupported');
- }
-
- /**
- * @dataProvider provideDSNs
- *
- * @param mixed $dsn
- * @param mixed $expectedFactoryClass
- */
- public function testReturnsExpectedFactoryInstance($dsn, $expectedFactoryClass)
- {
- $factory = \Enqueue\dsn_to_connection_factory($dsn);
-
- $this->assertInstanceOf($expectedFactoryClass, $factory);
- }
-
- public static function provideDSNs()
- {
- yield ['amqp:', AmqpExtConnectionFactory::class];
-
- yield ['amqps:', AmqpExtConnectionFactory::class];
-
- yield ['amqp+ext:', AmqpExtConnectionFactory::class];
-
- yield ['amqps+ext:', AmqpExtConnectionFactory::class];
-
- yield ['amqp+lib:', AmqpLibConnectionFactory::class];
-
- yield ['amqps+lib:', AmqpLibConnectionFactory::class];
-
- yield ['amqp+bunny:', AmqpBunnyConnectionFactory::class];
-
- yield ['amqp://user:pass@foo/vhost', AmqpExtConnectionFactory::class];
-
- yield ['file:', FsConnectionFactory::class];
-
- yield ['file:///foo/bar/baz', FsConnectionFactory::class];
-
- yield ['null:', NullConnectionFactory::class];
-
- yield ['mysql:', DbalConnectionFactory::class];
-
- yield ['pgsql:', DbalConnectionFactory::class];
-
- yield ['beanstalk:', PheanstalkConnectionFactory::class];
-
-// yield ['gearman:', GearmanConnectionFactory::class];
-
- yield ['kafka:', RdKafkaConnectionFactory::class];
-
- yield ['redis:', RedisConnectionFactory::class];
-
- yield ['stomp:', StompConnectionFactory::class];
-
- yield ['sqs:', SqsConnectionFactory::class];
-
- yield ['gps:', GpsConnectionFactory::class];
-
- yield ['mongodb:', MongodbConnectionFactory::class];
- }
-}
diff --git a/pkg/enqueue/Tests/Functions/DsnToContextFunctionTest.php b/pkg/enqueue/Tests/Functions/DsnToContextFunctionTest.php
deleted file mode 100644
index 2a8bc83bc..000000000
--- a/pkg/enqueue/Tests/Functions/DsnToContextFunctionTest.php
+++ /dev/null
@@ -1,73 +0,0 @@
-expectException(\LogicException::class);
- $this->expectExceptionMessage('The scheme could not be parsed from DSN ""');
-
- \Enqueue\dsn_to_context('');
- }
-
- public function testThrowIfDsnMissingScheme()
- {
- $this->expectException(\LogicException::class);
- $this->expectExceptionMessage('The scheme could not be parsed from DSN "dsnMissingScheme"');
-
- \Enqueue\dsn_to_context('dsnMissingScheme');
- }
-
- public function testThrowIfDsnNotSupported()
- {
- $this->expectException(\LogicException::class);
- $this->expectExceptionMessage('The scheme "http" is not supported. Supported "file", "amqp+ext"');
-
- \Enqueue\dsn_to_context('http://schemeNotSupported');
- }
-
- /**
- * @dataProvider provideDSNs
- *
- * @param mixed $dsn
- * @param mixed $expectedFactoryClass
- */
- public function testReturnsExpectedFactoryInstance($dsn, $expectedFactoryClass)
- {
- $factory = \Enqueue\dsn_to_context($dsn);
-
- $this->assertInstanceOf($expectedFactoryClass, $factory);
- }
-
- public static function provideDSNs()
- {
- yield ['amqp:', AmqpContext::class];
-
- yield ['amqp://user:pass@foo/vhost', AmqpContext::class];
-
- yield ['file:', FsContext::class];
-
- yield ['file://'.sys_get_temp_dir(), FsContext::class];
-
- yield ['null:', NullContext::class];
-
- yield ['redis:', RedisContext::class];
-
- yield ['stomp:', StompContext::class];
-
- yield ['sqs:', SqsContext::class];
-
- yield ['gps:', GpsContext::class];
- }
-}
diff --git a/pkg/enqueue/Tests/ResourcesTest.php b/pkg/enqueue/Tests/ResourcesTest.php
new file mode 100644
index 000000000..0131b4c49
--- /dev/null
+++ b/pkg/enqueue/Tests/ResourcesTest.php
@@ -0,0 +1,130 @@
+assertTrue($rc->isFinal());
+ }
+
+ public function testShouldConstructorBePrivate()
+ {
+ $rc = new \ReflectionClass(Resources::class);
+
+ $this->assertTrue($rc->getConstructor()->isPrivate());
+ }
+
+ public function testShouldGetAvailableConnectionsInExpectedFormat()
+ {
+ $availableConnections = Resources::getAvailableConnections();
+
+ $this->assertInternalType('array', $availableConnections);
+ $this->assertArrayHasKey(RedisConnectionFactory::class, $availableConnections);
+
+ $connectionInfo = $availableConnections[RedisConnectionFactory::class];
+ $this->assertArrayHasKey('schemes', $connectionInfo);
+ $this->assertSame(['redis'], $connectionInfo['schemes']);
+
+ $this->assertArrayHasKey('supportedSchemeExtensions', $connectionInfo);
+ $this->assertSame(['predis', 'phpredis'], $connectionInfo['supportedSchemeExtensions']);
+
+ $this->assertArrayHasKey('package', $connectionInfo);
+ $this->assertSame('enqueue/redis', $connectionInfo['package']);
+ }
+
+ public function testShouldGetKnownConnectionsInExpectedFormat()
+ {
+ $availableConnections = Resources::getKnownConnections();
+
+ $this->assertInternalType('array', $availableConnections);
+ $this->assertArrayHasKey(RedisConnectionFactory::class, $availableConnections);
+
+ $connectionInfo = $availableConnections[RedisConnectionFactory::class];
+ $this->assertArrayHasKey('schemes', $connectionInfo);
+ $this->assertSame(['redis'], $connectionInfo['schemes']);
+
+ $this->assertArrayHasKey('supportedSchemeExtensions', $connectionInfo);
+ $this->assertSame(['predis', 'phpredis'], $connectionInfo['supportedSchemeExtensions']);
+
+ $this->assertArrayHasKey('package', $connectionInfo);
+ $this->assertSame('enqueue/redis', $connectionInfo['package']);
+ }
+
+ public function testThrowsIfConnectionClassNotImplementConnectionFactoryInterfaceOnAddConnection()
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('The connection factory class "stdClass" must implement "Interop\Queue\PsrConnectionFactory" interface.');
+
+ Resources::addConnection(\stdClass::class, [], [], 'foo');
+ }
+
+ public function testThrowsIfNoSchemesProvidedOnAddConnection()
+ {
+ $connectionClass = $this->getMockClass(PsrConnectionFactory::class);
+
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Schemes could not be empty.');
+
+ Resources::addConnection($connectionClass, [], [], 'foo');
+ }
+
+ public function testThrowsIfNoPackageProvidedOnAddConnection()
+ {
+ $connectionClass = $this->getMockClass(PsrConnectionFactory::class);
+
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Package name could not be empty.');
+
+ Resources::addConnection($connectionClass, ['foo'], [], '');
+ }
+
+ public function testShouldAllowRegisterConnectionThatIsNotInstalled()
+ {
+ Resources::addConnection('theConnectionClass', ['foo'], [], 'foo');
+
+ $knownConnections = Resources::getKnownConnections();
+ $this->assertInternalType('array', $knownConnections);
+ $this->assertArrayHasKey('theConnectionClass', $knownConnections);
+
+ $availableConnections = Resources::getAvailableConnections();
+
+ $this->assertInternalType('array', $availableConnections);
+ $this->assertArrayNotHasKey('theConnectionClass', $availableConnections);
+ }
+
+ public function testShouldAllowGetPreviouslyRegisteredConnection()
+ {
+ $connectionClass = $this->getMockClass(PsrConnectionFactory::class);
+
+ Resources::addConnection(
+ $connectionClass,
+ ['fooscheme', 'barscheme'],
+ ['fooextension', 'barextension'],
+ 'foo/bar'
+ );
+
+ $availableConnections = Resources::getAvailableConnections();
+
+ $this->assertInternalType('array', $availableConnections);
+ $this->assertArrayHasKey($connectionClass, $availableConnections);
+
+ $connectionInfo = $availableConnections[$connectionClass];
+ $this->assertArrayHasKey('schemes', $connectionInfo);
+ $this->assertSame(['fooscheme', 'barscheme'], $connectionInfo['schemes']);
+
+ $this->assertArrayHasKey('supportedSchemeExtensions', $connectionInfo);
+ $this->assertSame(['fooextension', 'barextension'], $connectionInfo['supportedSchemeExtensions']);
+
+ $this->assertArrayHasKey('package', $connectionInfo);
+ $this->assertSame('foo/bar', $connectionInfo['package']);
+ }
+}
diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
index a97fb86b8..5b480d809 100644
--- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
+++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
@@ -9,6 +9,7 @@
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\Processor;
use Symfony\Component\DependencyInjection\ContainerBuilder;
+use Symfony\Component\DependencyInjection\Reference;
class DefaultTransportFactoryTest extends TestCase
{
@@ -181,27 +182,27 @@ public function testShouldCreateDriverFromAlias()
$this->assertEquals($driverId, (string) $context);
}
- /**
- * @dataProvider provideDSNs
- *
- * @param mixed $dsn
- * @param mixed $expectedName
- */
- public function testShouldCreateConnectionFactoryFromDSN($dsn, $expectedName)
+ public function testShouldCreateConnectionFactoryFromDSN()
{
$container = new ContainerBuilder();
$transport = new DefaultTransportFactory();
- $serviceId = $transport->createConnectionFactory($container, ['dsn' => $dsn]);
+ $serviceId = $transport->createConnectionFactory($container, ['dsn' => 'foo://bar/baz']);
$this->assertEquals('enqueue.transport.default.connection_factory', $serviceId);
- $this->assertTrue($container->hasAlias('enqueue.transport.default.connection_factory'));
+ $this->assertTrue($container->hasDefinition('enqueue.transport.default.connection_factory'));
+
+ $this->assertNotEmpty($container->getDefinition('enqueue.transport.default.connection_factory')->getFactory());
$this->assertEquals(
- sprintf('enqueue.transport.%s.connection_factory', $expectedName),
- (string) $container->getAlias('enqueue.transport.default.connection_factory')
- );
+ [new Reference('enqueue.connection_factory_factory'), 'create'],
+ $container->getDefinition('enqueue.transport.default.connection_factory')->getFactory())
+ ;
+ $this->assertSame(
+ ['foo://bar/baz'],
+ $container->getDefinition('enqueue.transport.default.connection_factory')->getArguments())
+ ;
$this->assertTrue($container->hasAlias('enqueue.transport.connection_factory'));
$this->assertEquals(
@@ -210,88 +211,63 @@ public function testShouldCreateConnectionFactoryFromDSN($dsn, $expectedName)
);
}
- /**
- * @dataProvider provideDSNs
- *
- * @param mixed $dsn
- * @param mixed $expectedName
- */
- public function testShouldCreateContextFromDsn($dsn, $expectedName)
+ public function testShouldCreateContextFromDsn()
{
$container = new ContainerBuilder();
$transport = new DefaultTransportFactory();
- $serviceId = $transport->createContext($container, ['dsn' => $dsn]);
+ $serviceId = $transport->createContext($container, ['dsn' => 'foo://bar/baz']);
$this->assertEquals('enqueue.transport.default.context', $serviceId);
- $this->assertTrue($container->hasAlias($serviceId));
- $context = $container->getAlias($serviceId);
+ $this->assertNotEmpty($container->getDefinition('enqueue.transport.default.context')->getFactory());
$this->assertEquals(
- sprintf('enqueue.transport.%s.context', $expectedName),
- (string) $context
- );
+ [new Reference('enqueue.transport.default.connection_factory'), 'createContext'],
+ $container->getDefinition('enqueue.transport.default.context')->getFactory())
+ ;
+ $this->assertSame(
+ [],
+ $container->getDefinition('enqueue.transport.default.context')->getArguments())
+ ;
$this->assertTrue($container->hasAlias('enqueue.transport.context'));
- $context = $container->getAlias('enqueue.transport.context');
- $this->assertEquals($serviceId, (string) $context);
+ $this->assertEquals(
+ 'enqueue.transport.default.context',
+ (string) $container->getAlias('enqueue.transport.context')
+ );
}
- /**
- * @dataProvider provideDSNs
- *
- * @param mixed $dsn
- * @param mixed $expectedName
- */
- public function testShouldCreateDriverFromDsn($dsn, $expectedName)
+ public function testShouldCreateDriverFromDsn()
{
$container = new ContainerBuilder();
$transport = new DefaultTransportFactory();
- $driverId = $transport->createDriver($container, ['dsn' => $dsn]);
+ $serviceId = $transport->createDriver($container, ['dsn' => 'foo://bar/baz', 'foo' => 'fooVal']);
- $this->assertEquals('enqueue.client.default.driver', $driverId);
+ $this->assertEquals('enqueue.client.default.driver', $serviceId);
- $this->assertTrue($container->hasAlias($driverId));
- $context = $container->getAlias($driverId);
+ $this->assertTrue($container->hasDefinition('enqueue.client.default.driver'));
+
+ $this->assertNotEmpty($container->getDefinition('enqueue.client.default.driver')->getFactory());
$this->assertEquals(
- sprintf('enqueue.client.%s.driver', $expectedName),
- (string) $context
- );
+ [new Reference('enqueue.client.driver_factory'), 'create'],
+ $container->getDefinition('enqueue.client.default.driver')->getFactory())
+ ;
+ $this->assertEquals(
+ [
+ new Reference('enqueue.transport.default.connection_factory'),
+ 'foo://bar/baz',
+ ['dsn' => 'foo://bar/baz', 'foo' => 'fooVal'],
+ ],
+ $container->getDefinition('enqueue.client.default.driver')->getArguments())
+ ;
$this->assertTrue($container->hasAlias('enqueue.client.driver'));
- $context = $container->getAlias('enqueue.client.driver');
- $this->assertEquals($driverId, (string) $context);
- }
-
- public static function provideDSNs()
- {
- yield ['amqp+ext:', 'default_amqp'];
-
- yield ['amqp+lib:', 'default_amqp'];
-
- yield ['amqp+bunny:', 'default_amqp'];
-
- yield ['null:', 'default_null'];
-
- yield ['file:', 'default_fs'];
-
- yield ['mysql:', 'default_dbal'];
-
- yield ['pgsql:', 'default_dbal'];
-
- yield ['gps:', 'default_gps'];
-
- yield ['sqs:', 'default_sqs'];
-
- yield ['redis:', 'default_redis'];
-
- yield ['stomp:', 'default_stomp'];
-
- yield ['kafka:', 'default_kafka'];
-
- yield ['mongodb:', 'default_mongodb'];
+ $this->assertEquals(
+ 'enqueue.client.default.driver',
+ (string) $container->getAlias('enqueue.client.driver')
+ );
}
}
diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json
index 5d603a859..e61270c7d 100644
--- a/pkg/enqueue/composer.json
+++ b/pkg/enqueue/composer.json
@@ -56,7 +56,6 @@
},
"autoload": {
"psr-4": { "Enqueue\\": "" },
- "files": ["functions_include.php"],
"exclude-from-classmap": [
"/Tests/"
]
diff --git a/pkg/enqueue/functions.php b/pkg/enqueue/functions.php
deleted file mode 100644
index 2fce13487..000000000
--- a/pkg/enqueue/functions.php
+++ /dev/null
@@ -1,181 +0,0 @@
-createContext();
-}
-
-/**
- * @param PsrContext $c
- * @param string $topic
- * @param string $body
- */
-function send_topic(PsrContext $c, $topic, $body)
-{
- $topic = $c->createTopic($topic);
- $message = $c->createMessage($body);
-
- $c->createProducer()->send($topic, $message);
-}
-
-/**
- * @param PsrContext $c
- * @param string $queue
- * @param string $body
- */
-function send_queue(PsrContext $c, $queue, $body)
-{
- $queue = $c->createQueue($queue);
- $message = $c->createMessage($body);
-
- $c->createProducer()->send($queue, $message);
-}
-
-/**
- * @param PsrContext $c
- * @param string $queueName
- * @param callable $callback
- */
-function consume(PsrContext $c, string $queueName, callable $callback)
-{
- $queueConsumer = new QueueConsumer($c);
- $queueConsumer->bindCallback($queueName, $callback);
-
- $queueConsumer->consume();
-}
diff --git a/pkg/enqueue/functions_include.php b/pkg/enqueue/functions_include.php
deleted file mode 100644
index cf5502ab1..000000000
--- a/pkg/enqueue/functions_include.php
+++ /dev/null
@@ -1,6 +0,0 @@
-register('enqueue.connection_factory_factory', ConnectionFactoryFactory::class);
+
+ $container->register('enqueue.client.driver_factory', DriverFactory::class)
+ ->addArgument(new Reference('enqueue.client.config'))
+ ->addArgument(new Reference('enqueue.client.meta.queue_meta_registry'))
+ ;
+
$container->register('enqueue.client.rpc_factory', RpcFactory::class)
->setPublic(true)
->setArguments([
new Reference('enqueue.transport.context'),
- ]);
+ ])
+ ;
$container->register('enqueue.client.producer', Producer::class)
->setPublic(true)
->setArguments([
new Reference('enqueue.client.driver'),
new Reference('enqueue.client.rpc_factory'),
- ]);
+ ])
+ ;
$container->setAlias('enqueue.client.producer_v2', new Alias('enqueue.client.producer', true));