Pipe improvements, misc cleanup

* Pipe improvements; better filter code, pipeline etc.
* Moved commands in PDO plugin to dedicated namespace
This commit is contained in:
Chris 2021-12-23 15:31:26 +01:00
parent 9050c74a08
commit f4257b39e4
11 changed files with 254 additions and 15 deletions

View File

@ -30,6 +30,7 @@ class PdoShell {
private ?array $lastQuery = null; private ?array $lastQuery = null;
#[EnumSetting('output', [ 'table', 'vertical', 'dump' ])] #[EnumSetting('output', [ 'table', 'vertical', 'dump' ])]
#[EnumSetting('table.style', [ 'box', 'compact', 'borderless' ])]
private array $defaultOptions = [ private array $defaultOptions = [
'output' => 'table', 'output' => 'table',
'table.maxwidth' => 40, 'table.maxwidth' => 40,

View File

@ -1,6 +1,6 @@
<?php <?php
namespace SparkPlug\Com\Noccy\Pdo; namespace SparkPlug\Com\Noccy\Pdo\Commands;
use Spark\Commands\Command; use Spark\Commands\Command;
use Symfony\Component\Console\Helper\Table; use Symfony\Component\Console\Helper\Table;

View File

@ -1,6 +1,6 @@
<?php <?php
namespace SparkPlug\Com\Noccy\Pdo; namespace SparkPlug\Com\Noccy\Pdo\Commands;
use Spark\Commands\Command; use Spark\Commands\Command;
use Symfony\Component\Console\Helper\Table; use Symfony\Component\Console\Helper\Table;

View File

@ -1,6 +1,6 @@
<?php <?php
namespace SparkPlug\Com\Noccy\Pdo; namespace SparkPlug\Com\Noccy\Pdo\Commands;
use Spark\Commands\Command; use Spark\Commands\Command;
use Symfony\Component\Console\Helper\Table; use Symfony\Component\Console\Helper\Table;

View File

@ -7,8 +7,8 @@ use SparkPlug;
class PdoPlugin extends SparkPlug { class PdoPlugin extends SparkPlug {
public function load() public function load()
{ {
register_command(new PdoQueryCommand()); register_command(new Commands\PdoQueryCommand());
register_command(new PdoExecCommand()); register_command(new Commands\PdoExecCommand());
} }
} }

View File

@ -2,6 +2,8 @@
use Spark\Commands\Command; use Spark\Commands\Command;
use Spark\Environment\Environment; use Spark\Environment\Environment;
use Spark\Pipe\Filters\FilterInterface;
use Spark\Pipe\Filters\PhpFilter;
use Spark\Resource\ResourceType; use Spark\Resource\ResourceType;
use Spark\SparkApplication; use Spark\SparkApplication;
@ -87,7 +89,7 @@ function read_config($file=null) {
$FILTERS = []; $FILTERS = [];
function register_filter(string $name, callable $filter) { function register_filter(string $name, string|callable $filter) {
global $FILTERS; global $FILTERS;
$FILTERS[$name] = $filter; $FILTERS[$name] = $filter;
} }
@ -98,8 +100,18 @@ function get_registered_filters(): array
return array_keys($FILTERS); return array_keys($FILTERS);
} }
function get_filter(string $name): ?callable function get_filter(string $name, array $args=[]): null|FilterInterface|callable
{ {
global $FILTERS; global $FILTERS;
return $FILTERS[$name]??null; $filter = $FILTERS[$name]??null;
if (is_string($filter) && class_exists($filter)) {
$filter = new $filter;
}
if (is_callable($filter)) {
$filter = new PhpFilter($filter);
}
if ($filter instanceof FilterInterface) {
$filter->setArguments($args);
}
return $filter;
} }

View File

@ -4,6 +4,8 @@ namespace Spark\Commands;
use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Attribute\AsCommand;
use Spark\Commands\Command; use Spark\Commands\Command;
use Spark\Pipe\Filters\ProgressFilter;
use Spark\Pipe\Pipeline;
use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputOption;
@ -17,9 +19,9 @@ class PipeCommand extends Command
protected function configure() protected function configure()
{ {
$this->addOption("list-filters", null, InputOption::VALUE_NONE, "List the defined filters"); $this->addOption("list-filters", null, InputOption::VALUE_NONE, "List the defined filters");
$this->addOption("fdin", null, InputOption::VALUE_REQUIRED, "Input fd, for reading from", 0); $this->addOption("input", "i", InputOption::VALUE_REQUIRED, "Input file or fd, for reading from", 0);
$this->addOption("fdout", null, InputOption::VALUE_REQUIRED, "Output fd, for writing to", 1); $this->addOption("output", "o", InputOption::VALUE_REQUIRED, "Output file or fd, for writing to", 1);
$this->addOption("fderr", null, InputOption::VALUE_REQUIRED, "Error fd, for progress report and status", 2); $this->addOption("error", "e", InputOption::VALUE_REQUIRED, "Error file or fd, for progress report and status", 2);
$this->addArgument("filter", InputArgument::OPTIONAL, "Pipe filter"); $this->addArgument("filter", InputArgument::OPTIONAL, "Pipe filter");
$this->addArgument("args", InputArgument::OPTIONAL|InputArgument::IS_ARRAY, "Arguments to the script"); $this->addArgument("args", InputArgument::OPTIONAL|InputArgument::IS_ARRAY, "Arguments to the script");
$this->registerDefaultFilters(); $this->registerDefaultFilters();
@ -35,6 +37,7 @@ class PipeCommand extends Command
$hashed = password_hash($trimmed, PASSWORD_BCRYPT); $hashed = password_hash($trimmed, PASSWORD_BCRYPT);
return str_replace($trimmed, $hashed, $in); return str_replace($trimmed, $hashed, $in);
}); });
register_filter("progress", ProgressFilter::class);
} }
protected function execute(InputInterface $input, OutputInterface $output) protected function execute(InputInterface $input, OutputInterface $output)
@ -46,17 +49,44 @@ class PipeCommand extends Command
return Command::SUCCESS; return Command::SUCCESS;
} }
$fdin = "php://fd/".$input->getOption("fdin"); $fdin = $input->getOption("input");
$fdout = "php://fd/".$input->getOption("fdout"); $fdout = $input->getOption("output");
$fderr = "php://fd/".$input->getOption("fderr"); $fderr = $input->getOption("error");
$filterargs = $input->getArgument('args');
$args = [];
foreach ($filterargs as $arg) {
if (str_contains($arg, '=')) {
[$k,$v] = explode("=", $arg, 2);
$args[$k] = $v;
} else {
if (str_starts_with($arg, 'no-')) {
$args[substr($arg,3)] = false;
} else {
$args[$arg] = true;
}
}
}
$filtername = $input->getArgument("filter"); $filtername = $input->getArgument("filter");
if ($filtername) { if ($filtername) {
$filter = get_filter($filtername); $filter = get_filter($filtername, $args);
} else { } else {
$filter = null; $filter = null;
} }
$pipeline = new Pipeline();
$pipeline->setInputFile($fdin);
$pipeline->setOutputFile($fdout);
if ($filter) {
$pipeline->addFilter($filter);
}
$pipeline->run();
/*
$fin = fopen($fdin, "rb"); $fin = fopen($fdin, "rb");
$fout = fopen($fdout, "wb"); $fout = fopen($fdout, "wb");
while (!feof($fin)) { while (!feof($fin)) {
@ -64,6 +94,7 @@ class PipeCommand extends Command
if (is_callable($filter)) $buf = $filter($buf); if (is_callable($filter)) $buf = $filter($buf);
fputs($fout, $buf); fputs($fout, $buf);
} }
*/
return Command::SUCCESS; return Command::SUCCESS;
} }

View File

@ -0,0 +1,10 @@
<?php
namespace Spark\Pipe\Filters;
interface FilterInterface
{
public function setArguments(array $args);
public function pipe(?string $chunk);
}

View File

@ -0,0 +1,32 @@
<?php
namespace Spark\Pipe\Filters;
class PhpFilter implements FilterInterface
{
private $method;
private array $args = [];
private static $defaultArgs = [
];
public function __construct(string|callable $method)
{
$this->method = $method;
}
public function setArguments(array $args)
{
$this->args = array_merge(self::$defaultArgs, $args);
}
public function pipe(?string $chunk)
{
if ($chunk === null) {
return;
}
return call_user_func($this->method, $chunk);
}
}

View File

@ -0,0 +1,98 @@
<?php
namespace Spark\Pipe\Filters;
class ProgressFilter implements FilterInterface
{
private array $args = [];
private static $defaultArgs = [
'max' => null,
];
private ?int $max = null;
private int $current = 0;
private int $lastCurrent = 0;
private ?int $nextRefresh = null;
private array $deltas = [];
private array $times = [];
public function setArguments(array $args)
{
$this->args = array_merge(self::$defaultArgs, $args);
$max = $this->args['max'];
if (str_starts_with($max, '@')) {
$this->max = filesize(substr($max,1));
} else {
$this->max = $max;
}
}
public function pipe(?string $chunk)
{
if ($chunk === null) {
$this->nextRefresh = 0;
$this->refresh();
fwrite(STDERR, "\n");
return;
}
$this->current += strlen($chunk);
$this->refresh();
return $chunk;
}
private function refresh()
{
if (microtime(true) < $this->nextRefresh) {
return;
}
$now = microtime(true);
$delta = $this->current - $this->lastCurrent;
array_push($this->deltas, $delta);
array_push($this->times, $now);
while (count($this->deltas) > 10) {
array_shift($this->deltas);
array_shift($this->times);
}
$deltaTime = end($this->times) - reset($this->times);
if ($deltaTime > 0) {
$rate = array_sum($this->deltas) / $deltaTime;
$rateu = "b/s";
if ($rate > 1024) {
$rate /= 1024;
$rateu = "KiB/s";
if ($rate > 1024) {
$rate /= 1024;
$rateu = "MiB/s";
if ($rate > 1024) {
$rate /= 1024;
$rateu = "GiB/s";
}
}
}
} else {
$rate = null;
}
if ($this->max) {
fprintf(STDERR, "\r%.1fMiB/%.1fMiB (%.1f%%) ", $this->current/1024/1024, $this->max/1024/1024, 100/$this->max*$this->current);
} else {
fprintf(STDERR, "\r%.1fMiB ", $this->current/1024/1024);
}
if ($rate) {
fprintf(STDERR, "(%.1f%s)", $rate, $rateu);
}
fprintf(STDERR, "\e[K");
$this->lastCurrent = $this->current;
$this->nextRefresh = $now + .1;
}
}

55
src/Pipe/Pipeline.php Normal file
View File

@ -0,0 +1,55 @@
<?php
namespace Spark\Pipe;
use Spark\Pipe\Filters\FilterInterface;
class Pipeline
{
private $fdin;
private $fdout;
private array $filters = [];
public function setInputFile(string $filename)
{
if (ctype_digit($filename) && !file_exists($filename)) {
$filename = "php://fd/" . $filename;
}
$this->fdin = fopen($filename, 'rb');
}
public function setOutputFile(string $filename)
{
if (ctype_digit($filename) && !file_exists($filename)) {
$filename = "php://fd/" . $filename;
}
$this->fdout = fopen($filename, 'wb');
}
public function addFilter(FilterInterface $filter)
{
$this->filters[] = $filter;
}
public function run()
{
while (!feof($this->fdin)) {
ob_start();
$buf = fread($this->fdin, 8192);
foreach ($this->filters as $filter) {
$buf = $filter->pipe($buf);
}
ob_end_clean();
fwrite($this->fdout, $buf);
}
ob_start();
foreach ($this->filters as $filter) {
$filter->pipe(null);
}
ob_end_clean();
}
}