From f4257b39e45016757f5120e722d147ee4ffdf7bc Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Thu, 23 Dec 2021 15:31:26 +0100 Subject: [PATCH] Pipe improvements, misc cleanup * Pipe improvements; better filter code, pipeline etc. * Moved commands in PDO plugin to dedicated namespace --- .../com.noccy.pdo.shell/Shell/PdoShell.php | 1 + .../{ => Commands}/PdoExecCommand.php | 2 +- .../{ => Commands}/PdoQueryCommand.php | 2 +- .../{ => Commands}/PdoStoreCommand.php | 2 +- plugins/com.noccy.pdo/sparkplug.php | 4 +- runtime/functions.php | 18 +++- src/Commands/PipeCommand.php | 45 +++++++-- src/Pipe/Filters/FilterInterface.php | 10 ++ src/Pipe/Filters/PhpFilter.php | 32 ++++++ src/Pipe/Filters/ProgressFilter.php | 98 +++++++++++++++++++ src/Pipe/Pipeline.php | 55 +++++++++++ 11 files changed, 254 insertions(+), 15 deletions(-) rename plugins/com.noccy.pdo/{ => Commands}/PdoExecCommand.php (96%) rename plugins/com.noccy.pdo/{ => Commands}/PdoQueryCommand.php (98%) rename plugins/com.noccy.pdo/{ => Commands}/PdoStoreCommand.php (98%) create mode 100644 src/Pipe/Filters/FilterInterface.php create mode 100644 src/Pipe/Filters/PhpFilter.php create mode 100644 src/Pipe/Filters/ProgressFilter.php create mode 100644 src/Pipe/Pipeline.php diff --git a/plugins/com.noccy.pdo.shell/Shell/PdoShell.php b/plugins/com.noccy.pdo.shell/Shell/PdoShell.php index d1bd24c..5b2d78d 100644 --- a/plugins/com.noccy.pdo.shell/Shell/PdoShell.php +++ b/plugins/com.noccy.pdo.shell/Shell/PdoShell.php @@ -30,6 +30,7 @@ class PdoShell { private ?array $lastQuery = null; #[EnumSetting('output', [ 'table', 'vertical', 'dump' ])] + #[EnumSetting('table.style', [ 'box', 'compact', 'borderless' ])] private array $defaultOptions = [ 'output' => 'table', 'table.maxwidth' => 40, diff --git a/plugins/com.noccy.pdo/PdoExecCommand.php b/plugins/com.noccy.pdo/Commands/PdoExecCommand.php similarity index 96% rename from plugins/com.noccy.pdo/PdoExecCommand.php rename to plugins/com.noccy.pdo/Commands/PdoExecCommand.php index 13f2f3f..9da2dfa 100644 --- a/plugins/com.noccy.pdo/PdoExecCommand.php +++ b/plugins/com.noccy.pdo/Commands/PdoExecCommand.php @@ -1,6 +1,6 @@ setArguments($args); + } + return $filter; } \ No newline at end of file diff --git a/src/Commands/PipeCommand.php b/src/Commands/PipeCommand.php index 20fbf64..07aac88 100644 --- a/src/Commands/PipeCommand.php +++ b/src/Commands/PipeCommand.php @@ -4,6 +4,8 @@ namespace Spark\Commands; use Symfony\Component\Console\Attribute\AsCommand; use Spark\Commands\Command; +use Spark\Pipe\Filters\ProgressFilter; +use Spark\Pipe\Pipeline; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; @@ -17,9 +19,9 @@ class PipeCommand extends Command protected function configure() { $this->addOption("list-filters", null, InputOption::VALUE_NONE, "List the defined filters"); - $this->addOption("fdin", null, InputOption::VALUE_REQUIRED, "Input fd, for reading from", 0); - $this->addOption("fdout", null, InputOption::VALUE_REQUIRED, "Output fd, for writing to", 1); - $this->addOption("fderr", null, InputOption::VALUE_REQUIRED, "Error fd, for progress report and status", 2); + $this->addOption("input", "i", InputOption::VALUE_REQUIRED, "Input file or fd, for reading from", 0); + $this->addOption("output", "o", InputOption::VALUE_REQUIRED, "Output file or fd, for writing to", 1); + $this->addOption("error", "e", InputOption::VALUE_REQUIRED, "Error file or fd, for progress report and status", 2); $this->addArgument("filter", InputArgument::OPTIONAL, "Pipe filter"); $this->addArgument("args", InputArgument::OPTIONAL|InputArgument::IS_ARRAY, "Arguments to the script"); $this->registerDefaultFilters(); @@ -35,6 +37,7 @@ class PipeCommand extends Command $hashed = password_hash($trimmed, PASSWORD_BCRYPT); return str_replace($trimmed, $hashed, $in); }); + register_filter("progress", ProgressFilter::class); } protected function execute(InputInterface $input, OutputInterface $output) @@ -46,17 +49,44 @@ class PipeCommand extends Command return Command::SUCCESS; } - $fdin = "php://fd/".$input->getOption("fdin"); - $fdout = "php://fd/".$input->getOption("fdout"); - $fderr = "php://fd/".$input->getOption("fderr"); + $fdin = $input->getOption("input"); + $fdout = $input->getOption("output"); + $fderr = $input->getOption("error"); + + + $filterargs = $input->getArgument('args'); + $args = []; + foreach ($filterargs as $arg) { + if (str_contains($arg, '=')) { + [$k,$v] = explode("=", $arg, 2); + $args[$k] = $v; + } else { + if (str_starts_with($arg, 'no-')) { + $args[substr($arg,3)] = false; + } else { + $args[$arg] = true; + } + } + } $filtername = $input->getArgument("filter"); if ($filtername) { - $filter = get_filter($filtername); + $filter = get_filter($filtername, $args); } else { $filter = null; } + + $pipeline = new Pipeline(); + $pipeline->setInputFile($fdin); + $pipeline->setOutputFile($fdout); + if ($filter) { + $pipeline->addFilter($filter); + } + + $pipeline->run(); + + /* $fin = fopen($fdin, "rb"); $fout = fopen($fdout, "wb"); while (!feof($fin)) { @@ -64,6 +94,7 @@ class PipeCommand extends Command if (is_callable($filter)) $buf = $filter($buf); fputs($fout, $buf); } + */ return Command::SUCCESS; } diff --git a/src/Pipe/Filters/FilterInterface.php b/src/Pipe/Filters/FilterInterface.php new file mode 100644 index 0000000..8ae2296 --- /dev/null +++ b/src/Pipe/Filters/FilterInterface.php @@ -0,0 +1,10 @@ +method = $method; + } + + public function setArguments(array $args) + { + $this->args = array_merge(self::$defaultArgs, $args); + } + + public function pipe(?string $chunk) + { + if ($chunk === null) { + return; + } + return call_user_func($this->method, $chunk); + } + +} \ No newline at end of file diff --git a/src/Pipe/Filters/ProgressFilter.php b/src/Pipe/Filters/ProgressFilter.php new file mode 100644 index 0000000..38f6e8b --- /dev/null +++ b/src/Pipe/Filters/ProgressFilter.php @@ -0,0 +1,98 @@ + null, + ]; + + private ?int $max = null; + + private int $current = 0; + + private int $lastCurrent = 0; + + private ?int $nextRefresh = null; + + private array $deltas = []; + + private array $times = []; + + public function setArguments(array $args) + { + $this->args = array_merge(self::$defaultArgs, $args); + $max = $this->args['max']; + if (str_starts_with($max, '@')) { + $this->max = filesize(substr($max,1)); + } else { + $this->max = $max; + } + } + + public function pipe(?string $chunk) + { + if ($chunk === null) { + $this->nextRefresh = 0; + $this->refresh(); + fwrite(STDERR, "\n"); + return; + } + $this->current += strlen($chunk); + $this->refresh(); + return $chunk; + } + + private function refresh() + { + if (microtime(true) < $this->nextRefresh) { + return; + } + + $now = microtime(true); + $delta = $this->current - $this->lastCurrent; + array_push($this->deltas, $delta); + array_push($this->times, $now); + while (count($this->deltas) > 10) { + array_shift($this->deltas); + array_shift($this->times); + } + + $deltaTime = end($this->times) - reset($this->times); + + if ($deltaTime > 0) { + $rate = array_sum($this->deltas) / $deltaTime; + $rateu = "b/s"; + if ($rate > 1024) { + $rate /= 1024; + $rateu = "KiB/s"; + if ($rate > 1024) { + $rate /= 1024; + $rateu = "MiB/s"; + if ($rate > 1024) { + $rate /= 1024; + $rateu = "GiB/s"; + } + } + } + } else { + $rate = null; + } + + if ($this->max) { + fprintf(STDERR, "\r%.1fMiB/%.1fMiB (%.1f%%) ", $this->current/1024/1024, $this->max/1024/1024, 100/$this->max*$this->current); + } else { + fprintf(STDERR, "\r%.1fMiB ", $this->current/1024/1024); + } + if ($rate) { + fprintf(STDERR, "(%.1f%s)", $rate, $rateu); + } + fprintf(STDERR, "\e[K"); + + $this->lastCurrent = $this->current; + $this->nextRefresh = $now + .1; + } +} \ No newline at end of file diff --git a/src/Pipe/Pipeline.php b/src/Pipe/Pipeline.php new file mode 100644 index 0000000..693f3dd --- /dev/null +++ b/src/Pipe/Pipeline.php @@ -0,0 +1,55 @@ +fdin = fopen($filename, 'rb'); + } + + public function setOutputFile(string $filename) + { + if (ctype_digit($filename) && !file_exists($filename)) { + $filename = "php://fd/" . $filename; + } + $this->fdout = fopen($filename, 'wb'); + } + + public function addFilter(FilterInterface $filter) + { + $this->filters[] = $filter; + } + + public function run() + { + while (!feof($this->fdin)) { + ob_start(); + $buf = fread($this->fdin, 8192); + foreach ($this->filters as $filter) { + $buf = $filter->pipe($buf); + } + ob_end_clean(); + fwrite($this->fdout, $buf); + } + ob_start(); + foreach ($this->filters as $filter) { + $filter->pipe(null); + } + ob_end_clean(); + } + +} \ No newline at end of file