Initial commit
This commit is contained in:
150
src/NtfiDaemon.php
Normal file
150
src/NtfiDaemon.php
Normal file
@@ -0,0 +1,150 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\Ntfi;
|
||||
|
||||
use Psr\Http\Message\ResponseInterface;
|
||||
use Psr\Http\Message\ServerRequestInterface;
|
||||
use React\Http\Browser;
|
||||
use React\Http\HttpServer;
|
||||
use React\Http\Message\Response;
|
||||
use React\Promise\Deferred;
|
||||
use React\Promise\Promise;
|
||||
use React\Promise\PromiseInterface;
|
||||
use React\Socket\TcpServer;
|
||||
use Symfony\Component\Yaml\Yaml;
|
||||
|
||||
class NtfiDaemon
|
||||
{
|
||||
|
||||
private ?string $configFile = null;
|
||||
|
||||
private ?HttpServer $httpServer = null;
|
||||
|
||||
private array $servers = [];
|
||||
|
||||
private array $channels = [];
|
||||
|
||||
public function setConfigFile(?string $filename): self
|
||||
{
|
||||
if ($filename && !file_exists($filename)) {
|
||||
throw new \Exception("Configuration file not found: {$filename}");
|
||||
}
|
||||
$this->configFile = $filename;
|
||||
return $this;
|
||||
}
|
||||
|
||||
private function readConfig(): array
|
||||
{
|
||||
$config = [
|
||||
'listen' => '127.0.0.1:13000',
|
||||
'servers' => [
|
||||
'default' => [
|
||||
'server' => 'ntfy.sh'
|
||||
]
|
||||
],
|
||||
'channels' => [
|
||||
'{topic}' => [
|
||||
'destination' => 'default/{topic}'
|
||||
]
|
||||
]
|
||||
];
|
||||
if ($this->configFile) {
|
||||
$config = array_merge(
|
||||
$config,
|
||||
Yaml::parseFile($this->configFile)
|
||||
);
|
||||
}
|
||||
|
||||
return $config;
|
||||
}
|
||||
|
||||
public function start(): self
|
||||
{
|
||||
$config = $this->readConfig();
|
||||
|
||||
$this->servers = $config['servers'];
|
||||
printf("Configured servers: %s\n", join(", ",array_keys($this->servers)));
|
||||
$this->channels = $config['channels'];
|
||||
printf("Configured channels:\n");
|
||||
foreach ($this->channels as $channel=>$info) {
|
||||
printf(" %s -> %s\n", $channel, $info['destination']??'?');
|
||||
}
|
||||
$this->setupHttp($config['listen']);
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
private function setupHttp(string $listen): void
|
||||
{
|
||||
$this->httpServer = new HttpServer(
|
||||
$this->onRequest(...)
|
||||
);
|
||||
$listener = new TcpServer($listen);
|
||||
$this->httpServer->listen($listener);
|
||||
}
|
||||
|
||||
private function onRequest(ServerRequestInterface $request)
|
||||
{
|
||||
$path = trim($request->getUri()->getPath(), '/');
|
||||
$type = $request->getHeaderLine("content-type");
|
||||
$body = $request->getBody();
|
||||
|
||||
try {
|
||||
$this->doPublish($path, $type, $body);
|
||||
} catch (\Exception $e) {
|
||||
return Response::plaintext($e->getMessage())->withStatus(500);
|
||||
}
|
||||
|
||||
return Response::plaintext("OK");
|
||||
}
|
||||
|
||||
private function doPublish(string $path, string $type, string $body): PromiseInterface
|
||||
{
|
||||
foreach ($this->channels as $pattern => $channel) {
|
||||
$re = '/^'.preg_replace('/\{(.+?)\}/', '(?P<$1>[a-zA-Z0-9-_]+)', $pattern).'$/';
|
||||
// printf("pattern={%s} re={%s}\n", $pattern, $re);
|
||||
if (preg_match($re, $path, $match)) {
|
||||
$dest = $channel['destination'];
|
||||
if (!str_contains($dest,'/')) {
|
||||
$connection = 'default';
|
||||
} else {
|
||||
[$connection,$dest] = explode('/',$dest,2);
|
||||
}
|
||||
$dest = preg_replace_callback('/\{(.+?)\}/', fn($m) => $match[$m[1]]??$m, $dest);
|
||||
return $this->publish($connection, $dest, $type, $body);
|
||||
}
|
||||
}
|
||||
return new Promise(function (callable $resolve, callable $reject) use ($path) {
|
||||
$reject(new \Exception("No channel matching '{$path}'"));
|
||||
});
|
||||
}
|
||||
|
||||
private function publish(string $connection, string $topic, string $type, string $body)
|
||||
{
|
||||
$d = new Deferred();
|
||||
$serverInfo = $this->servers[$connection] ?? throw new \Exception("The server '{$connection}' is not configured");
|
||||
|
||||
if (isset($serverInfo['token']) && $serverInfo['token']) {
|
||||
$browser = new Browser();
|
||||
$browser->withHeader("authorization", "Bearer {$serverInfo['token']}");
|
||||
}
|
||||
|
||||
$url = sprintf("https://%s/%s", $serverInfo['server'], $topic);
|
||||
echo $url;
|
||||
$d->resolve(new Response());
|
||||
return $d->promise();
|
||||
|
||||
$browser->post($url, [
|
||||
'content-type' => $type,
|
||||
], $body)->then(
|
||||
function (ResponseInterface $response) use ($d) {
|
||||
$d->resolve($response);
|
||||
},
|
||||
function (\Throwable $t) use ($d) {
|
||||
$d->reject($t);
|
||||
}
|
||||
);
|
||||
|
||||
return $d->promise();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user