pluginupdate.py: add support for adding/updating individual plugins (#336137)

This commit is contained in:
Matthieu Coudron 2024-11-21 20:26:17 +01:00 committed by GitHub
commit d6c5afdca4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 234 additions and 85 deletions

View File

@ -264,10 +264,15 @@ nix-shell -p vimPluginsUpdater --run 'vim-plugins-updater --github-token=mytoken
Alternatively, set the number of processes to a lower count to avoid rate-limiting. Alternatively, set the number of processes to a lower count to avoid rate-limiting.
```sh ```sh
nix-shell -p vimPluginsUpdater --run 'vim-plugins-updater --proc 1' nix-shell -p vimPluginsUpdater --run 'vim-plugins-updater --proc 1'
``` ```
If you want to update only certain plugins, you can specify them after the `update` command. Note that you must use the same plugin names as the `pkgs/applications/editors/vim/plugins/vim-plugin-names` file.
```sh
nix-shell -p vimPluginsUpdater --run 'vim-plugins-updater update "nvim-treesitter" "LazyVim"'
```
## How to maintain an out-of-tree overlay of vim plugins ? {#vim-out-of-tree-overlays} ## How to maintain an out-of-tree overlay of vim plugins ? {#vim-out-of-tree-overlays}
You can use the updater script to generate basic packages out of a custom vim You can use the updater script to generate basic packages out of a custom vim

View File

@ -32,7 +32,7 @@ from functools import wraps
from multiprocessing.dummy import Pool from multiprocessing.dummy import Pool
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Any, Callable, Dict, List, Optional, Tuple, Union from typing import Any, Callable
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
import git import git
@ -94,7 +94,7 @@ def make_request(url: str, token=None) -> urllib.request.Request:
# a dictionary of plugins and their new repositories # a dictionary of plugins and their new repositories
Redirects = Dict["PluginDesc", "Repo"] Redirects = dict["PluginDesc", "Repo"]
class Repo: class Repo:
@ -103,7 +103,7 @@ class Repo:
"""Url to the repo""" """Url to the repo"""
self._branch = branch self._branch = branch
# Redirect is the new Repo to use # Redirect is the new Repo to use
self.redirect: Optional["Repo"] = None self.redirect: "Repo | None" = None
self.token = "dummy_token" self.token = "dummy_token"
@property @property
@ -125,14 +125,14 @@ class Repo:
return True return True
@retry(urllib.error.URLError, tries=4, delay=3, backoff=2) @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
def latest_commit(self) -> Tuple[str, datetime]: def latest_commit(self) -> tuple[str, datetime]:
log.debug("Latest commit") log.debug("Latest commit")
loaded = self._prefetch(None) loaded = self._prefetch(None)
updated = datetime.strptime(loaded["date"], "%Y-%m-%dT%H:%M:%S%z") updated = datetime.strptime(loaded["date"], "%Y-%m-%dT%H:%M:%S%z")
return loaded["rev"], updated return loaded["rev"], updated
def _prefetch(self, ref: Optional[str]): def _prefetch(self, ref: str | None):
cmd = ["nix-prefetch-git", "--quiet", "--fetch-submodules", self.uri] cmd = ["nix-prefetch-git", "--quiet", "--fetch-submodules", self.uri]
if ref is not None: if ref is not None:
cmd.append(ref) cmd.append(ref)
@ -141,7 +141,7 @@ class Repo:
loaded = json.loads(data) loaded = json.loads(data)
return loaded return loaded
def prefetch(self, ref: Optional[str]) -> str: def prefetch(self, ref: str | None) -> str:
log.info("Prefetching %s", self.uri) log.info("Prefetching %s", self.uri)
loaded = self._prefetch(ref) loaded = self._prefetch(ref)
return loaded["sha256"] return loaded["sha256"]
@ -186,7 +186,7 @@ class RepoGitHub(Repo):
return True return True
@retry(urllib.error.URLError, tries=4, delay=3, backoff=2) @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
def latest_commit(self) -> Tuple[str, datetime]: def latest_commit(self) -> tuple[str, datetime]:
commit_url = self.url(f"commits/{self.branch}.atom") commit_url = self.url(f"commits/{self.branch}.atom")
log.debug("Sending request to %s", commit_url) log.debug("Sending request to %s", commit_url)
commit_req = make_request(commit_url, self.token) commit_req = make_request(commit_url, self.token)
@ -252,14 +252,14 @@ class RepoGitHub(Repo):
class PluginDesc: class PluginDesc:
repo: Repo repo: Repo
branch: str branch: str
alias: Optional[str] alias: str | None
@property @property
def name(self): def name(self):
return self.alias or self.repo.name return self.alias or self.repo.name
@staticmethod @staticmethod
def load_from_csv(config: FetchConfig, row: Dict[str, str]) -> "PluginDesc": def load_from_csv(config: FetchConfig, row: dict[str, str]) -> "PluginDesc":
log.debug("Loading row %s", row) log.debug("Loading row %s", row)
branch = row["branch"] branch = row["branch"]
repo = make_repo(row["repo"], branch.strip()) repo = make_repo(row["repo"], branch.strip())
@ -292,7 +292,7 @@ class Plugin:
commit: str commit: str
has_submodules: bool has_submodules: bool
sha256: str sha256: str
date: Optional[datetime] = None date: datetime | None = None
@property @property
def normalized_name(self) -> str: def normalized_name(self) -> str:
@ -303,7 +303,7 @@ class Plugin:
assert self.date is not None assert self.date is not None
return self.date.strftime("%Y-%m-%d") return self.date.strftime("%Y-%m-%d")
def as_json(self) -> Dict[str, str]: def as_json(self) -> dict[str, str]:
copy = self.__dict__.copy() copy = self.__dict__.copy()
del copy["date"] del copy["date"]
return copy return copy
@ -312,7 +312,7 @@ class Plugin:
def load_plugins_from_csv( def load_plugins_from_csv(
config: FetchConfig, config: FetchConfig,
input_file: Path, input_file: Path,
) -> List[PluginDesc]: ) -> list[PluginDesc]:
log.debug("Load plugins from csv %s", input_file) log.debug("Load plugins from csv %s", input_file)
plugins = [] plugins = []
with open(input_file, newline="") as csvfile: with open(input_file, newline="") as csvfile:
@ -359,10 +359,10 @@ class Editor:
name: str, name: str,
root: Path, root: Path,
get_plugins: str, get_plugins: str,
default_in: Optional[Path] = None, default_in: Path | None = None,
default_out: Optional[Path] = None, default_out: Path | None = None,
deprecated: Optional[Path] = None, deprecated: Path | None = None,
cache_file: Optional[str] = None, cache_file: str | None = None,
): ):
log.debug("get_plugins:", get_plugins) log.debug("get_plugins:", get_plugins)
self.name = name self.name = name
@ -388,6 +388,19 @@ class Editor:
fetch_config, args.input_file, editor.deprecated, append=append fetch_config, args.input_file, editor.deprecated, append=append
) )
plugin, _ = prefetch_plugin(pdesc) plugin, _ = prefetch_plugin(pdesc)
if ( # lua updater doesn't support updating individual plugin
self.name != "lua"
):
# update generated.nix
update = self.get_update(
args.input_file,
args.outfile,
fetch_config,
[plugin.normalized_name],
)
update()
autocommit = not args.no_commit autocommit = not args.no_commit
if autocommit: if autocommit:
commit( commit(
@ -404,16 +417,35 @@ class Editor:
"""CSV spec""" """CSV spec"""
print("the update member function should be overridden in subclasses") print("the update member function should be overridden in subclasses")
def get_current_plugins(self, nixpkgs: str) -> List[Plugin]: def get_current_plugins(
self, config: FetchConfig, nixpkgs: str
) -> list[tuple[PluginDesc, Plugin]]:
"""To fill the cache""" """To fill the cache"""
data = run_nix_expr(self.get_plugins, nixpkgs) data = run_nix_expr(self.get_plugins, nixpkgs)
plugins = [] plugins = []
for name, attr in data.items(): for name, attr in data.items():
p = Plugin(name, attr["rev"], attr["submodules"], attr["sha256"]) checksum = attr["checksum"]
plugins.append(p)
# https://github.com/NixOS/nixpkgs/blob/8a335419/pkgs/applications/editors/neovim/build-neovim-plugin.nix#L36
# https://github.com/NixOS/nixpkgs/pull/344478#discussion_r1786646055
version = re.search(r"\d\d\d\d-\d\d?-\d\d?", attr["version"])
if version is None:
raise ValueError(f"Cannot parse version: {attr['version']}")
date = datetime.strptime(version.group(), "%Y-%m-%d")
pdesc = PluginDesc.load_from_string(config, f'{attr["homePage"]} as {name}')
p = Plugin(
attr["pname"],
checksum["rev"],
checksum["submodules"],
checksum["sha256"],
date,
)
plugins.append((pdesc, p))
return plugins return plugins
def load_plugin_spec(self, config: FetchConfig, plugin_file) -> List[PluginDesc]: def load_plugin_spec(self, config: FetchConfig, plugin_file) -> list[PluginDesc]:
"""CSV spec""" """CSV spec"""
return load_plugins_from_csv(config, plugin_file) return load_plugins_from_csv(config, plugin_file)
@ -421,28 +453,115 @@ class Editor:
"""Returns nothing for now, writes directly to outfile""" """Returns nothing for now, writes directly to outfile"""
raise NotImplementedError() raise NotImplementedError()
def get_update(self, input_file: str, outfile: str, config: FetchConfig): def filter_plugins_to_update(
cache: Cache = Cache(self.get_current_plugins(self.nixpkgs), self.cache_file) self, plugin: PluginDesc, to_update: list[str]
) -> bool:
"""Function for filtering out plugins, that user doesn't want to update.
It is mainly used for updating only specific plugins, not all of them.
By default it filters out plugins not present in `to_update`,
assuming `to_update` is a list of plugin names (the same as in the
result expression).
This function is never called if `to_update` is empty.
Feel free to override this function in derived classes.
Note:
Known bug: you have to use a deprecated name, instead of new one.
This is because we resolve deprecations later and can't get new
plugin URL before we request info about it.
Although, we could parse deprecated.json, but it's a whole bunch
of spaghetti code, which I don't want to write.
Arguments:
plugin: Plugin on which you decide whether to ignore or not.
to_update:
List of strings passed to via the `--update` command line parameter.
By default, we assume it is a list of URIs identical to what
is in the input file.
Returns:
True if we should update plugin and False if not.
"""
return plugin.name.replace(".", "-") in to_update
def get_update(
self,
input_file: str,
output_file: str,
config: FetchConfig,
to_update: list[str] | None,
):
if to_update is None:
to_update = []
current_plugins = self.get_current_plugins(config, self.nixpkgs)
current_plugin_specs = self.load_plugin_spec(config, input_file)
cache: Cache = Cache(
[plugin for _description, plugin in current_plugins], self.cache_file
)
_prefetch = functools.partial(prefetch, cache=cache) _prefetch = functools.partial(prefetch, cache=cache)
def update() -> dict: plugins_to_update = (
plugins = self.load_plugin_spec(config, input_file) current_plugin_specs
if len(to_update) == 0
else [
description
for description in current_plugin_specs
if self.filter_plugins_to_update(description, to_update)
]
)
def update() -> Redirects:
if len(plugins_to_update) == 0:
log.error(
"\n\n\n\nIt seems like you provided some arguments to `--update`:\n"
+ ", ".join(to_update)
+ "\nBut after filtering, the result list of plugins is empty\n"
"\n"
"Are you sure you provided the same URIs as in your input file?\n"
"(" + str(input_file) + ")\n\n"
)
return {}
try: try:
pool = Pool(processes=config.proc) pool = Pool(processes=config.proc)
results = pool.map(_prefetch, plugins) results = pool.map(_prefetch, plugins_to_update)
finally: finally:
cache.store() cache.store()
print(f"{len(results)} of {len(current_plugins)} were checked")
# Do only partial update of out file
if len(results) != len(current_plugins):
results = self.merge_results(current_plugins, results)
plugins, redirects = check_results(results) plugins, redirects = check_results(results)
plugins = sorted(plugins, key=lambda v: v[1].normalized_name) plugins = sorted(plugins, key=lambda v: v[1].normalized_name)
self.generate_nix(plugins, outfile) self.generate_nix(plugins, output_file)
return redirects return redirects
return update return update
def merge_results(
self,
current: list[tuple[PluginDesc, Plugin]],
fetched: list[tuple[PluginDesc, Exception | Plugin, Repo | None]],
) -> list[tuple[PluginDesc, Exception | Plugin, Repo | None]]:
# transforming this to dict, so lookup is O(1) instead of O(n) (n is len(current))
result: dict[str, tuple[PluginDesc, Exception | Plugin, Repo | None]] = {
# also adding redirect (third item in the result tuple)
pl.normalized_name: (pdesc, pl, None)
for pdesc, pl in current
}
for plugin_desc, plugin, redirect in fetched:
result[plugin.normalized_name] = (plugin_desc, plugin, redirect)
return list(result.values())
@property @property
def attr_path(self): def attr_path(self):
return self.name + "Plugins" return self.name + "Plugins"
@ -544,6 +663,12 @@ class Editor:
description="Update all or a subset of existing plugins", description="Update all or a subset of existing plugins",
add_help=False, add_help=False,
) )
pupdate.add_argument(
"update_only",
default=None,
nargs="*",
help="Plugin URLs to update (must be the same as in the input file)",
)
pupdate.set_defaults(func=self.update) pupdate.set_defaults(func=self.update)
return main return main
@ -587,8 +712,8 @@ class CleanEnvironment(object):
def prefetch_plugin( def prefetch_plugin(
p: PluginDesc, p: PluginDesc,
cache: "Optional[Cache]" = None, cache: "Cache | None" = None,
) -> Tuple[Plugin, Optional[Repo]]: ) -> tuple[Plugin, Repo | None]:
commit = None commit = None
log.info(f"Fetching last commit for plugin {p.name} from {p.repo.uri}@{p.branch}") log.info(f"Fetching last commit for plugin {p.name} from {p.repo.uri}@{p.branch}")
commit, date = p.repo.latest_commit() commit, date = p.repo.latest_commit()
@ -621,10 +746,10 @@ def print_download_error(plugin: PluginDesc, ex: Exception):
def check_results( def check_results(
results: List[Tuple[PluginDesc, Union[Exception, Plugin], Optional[Repo]]], results: list[tuple[PluginDesc, Exception | Plugin, Repo | None]],
) -> Tuple[List[Tuple[PluginDesc, Plugin]], Redirects]: ) -> tuple[list[tuple[PluginDesc, Plugin]], Redirects]:
""" """ """ """
failures: List[Tuple[PluginDesc, Exception]] = [] failures: list[tuple[PluginDesc, Exception]] = []
plugins = [] plugins = []
redirects: Redirects = {} redirects: Redirects = {}
for pdesc, result, redirect in results: for pdesc, result, redirect in results:
@ -637,11 +762,10 @@ def check_results(
new_pdesc = PluginDesc(redirect, pdesc.branch, pdesc.alias) new_pdesc = PluginDesc(redirect, pdesc.branch, pdesc.alias)
plugins.append((new_pdesc, result)) plugins.append((new_pdesc, result))
print(f"{len(results) - len(failures)} plugins were checked", end="")
if len(failures) == 0: if len(failures) == 0:
return plugins, redirects return plugins, redirects
else: else:
log.error(f", {len(failures)} plugin(s) could not be downloaded:\n") log.error(f"{len(failures)} plugin(s) could not be downloaded:\n")
for plugin, exception in failures: for plugin, exception in failures:
print_download_error(plugin, exception) print_download_error(plugin, exception)
@ -661,7 +785,7 @@ def make_repo(uri: str, branch) -> Repo:
return repo return repo
def get_cache_path(cache_file_name: str) -> Optional[Path]: def get_cache_path(cache_file_name: str) -> Path | None:
xdg_cache = os.environ.get("XDG_CACHE_HOME", None) xdg_cache = os.environ.get("XDG_CACHE_HOME", None)
if xdg_cache is None: if xdg_cache is None:
home = os.environ.get("HOME", None) home = os.environ.get("HOME", None)
@ -673,7 +797,7 @@ def get_cache_path(cache_file_name: str) -> Optional[Path]:
class Cache: class Cache:
def __init__(self, initial_plugins: List[Plugin], cache_file_name: str) -> None: def __init__(self, initial_plugins: list[Plugin], cache_file_name: str) -> None:
self.cache_file = get_cache_path(cache_file_name) self.cache_file = get_cache_path(cache_file_name)
downloads = {} downloads = {}
@ -682,11 +806,11 @@ class Cache:
downloads.update(self.load()) downloads.update(self.load())
self.downloads = downloads self.downloads = downloads
def load(self) -> Dict[str, Plugin]: def load(self) -> dict[str, Plugin]:
if self.cache_file is None or not self.cache_file.exists(): if self.cache_file is None or not self.cache_file.exists():
return {} return {}
downloads: Dict[str, Plugin] = {} downloads: dict[str, Plugin] = {}
with open(self.cache_file) as f: with open(self.cache_file) as f:
data = json.load(f) data = json.load(f)
for attr in data.values(): for attr in data.values():
@ -707,7 +831,7 @@ class Cache:
data[name] = attr.as_json() data[name] = attr.as_json()
json.dump(data, f, indent=4, sort_keys=True) json.dump(data, f, indent=4, sort_keys=True)
def __getitem__(self, key: str) -> Optional[Plugin]: def __getitem__(self, key: str) -> Plugin | None:
return self.downloads.get(key, None) return self.downloads.get(key, None)
def __setitem__(self, key: str, value: Plugin) -> None: def __setitem__(self, key: str, value: Plugin) -> None:
@ -716,7 +840,7 @@ class Cache:
def prefetch( def prefetch(
pluginDesc: PluginDesc, cache: Cache pluginDesc: PluginDesc, cache: Cache
) -> Tuple[PluginDesc, Union[Exception, Plugin], Optional[Repo]]: ) -> tuple[PluginDesc, Exception | Plugin, Repo | None]:
try: try:
plugin, redirect = prefetch_plugin(pluginDesc, cache) plugin, redirect = prefetch_plugin(pluginDesc, cache)
cache[plugin.commit] = plugin cache[plugin.commit] = plugin
@ -731,7 +855,7 @@ def rewrite_input(
deprecated: Path, deprecated: Path,
# old pluginDesc and the new # old pluginDesc and the new
redirects: Redirects = {}, redirects: Redirects = {},
append: List[PluginDesc] = [], append: list[PluginDesc] = [],
): ):
log.info("Rewriting input file %s", input_file) log.info("Rewriting input file %s", input_file)
plugins = load_plugins_from_csv(config, input_file) plugins = load_plugins_from_csv(config, input_file)
@ -779,7 +903,7 @@ def rewrite_input(
writer.writerow(asdict(plugin)) writer.writerow(asdict(plugin))
def commit(repo: git.Repo, message: str, files: List[Path]) -> None: def commit(repo: git.Repo, message: str, files: list[Path]) -> None:
repo.index.add([str(f.resolve()) for f in files]) repo.index.add([str(f.resolve()) for f in files])
if repo.index.diff("HEAD"): if repo.index.diff("HEAD"):
@ -802,7 +926,14 @@ def update_plugins(editor: Editor, args):
) )
fetch_config = FetchConfig(args.proc, args.github_token) fetch_config = FetchConfig(args.proc, args.github_token)
update = editor.get_update(args.input_file, args.outfile, fetch_config) update = editor.get_update(
input_file=args.input_file,
output_file=args.outfile,
config=fetch_config,
to_update=getattr( # if script was called without arguments
args, "update_only", None
),
)
start_time = time.time() start_time = time.time()
redirects = update() redirects = update()

View File

@ -12,7 +12,6 @@ import inspect
import os import os
import sys import sys
from pathlib import Path from pathlib import Path
from typing import List, Tuple
# Import plugin update library from maintainers/scripts/pluginupdate.py # Import plugin update library from maintainers/scripts/pluginupdate.py
ROOT = Path(os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))) # type: ignore ROOT = Path(os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))) # type: ignore
@ -21,13 +20,11 @@ sys.path.insert(
) )
import pluginupdate import pluginupdate
GET_PLUGINS = f"""( GET_PLUGINS = f"""with import <localpkgs> {{ }};
with import <localpkgs> {{ }};
let let
inherit (kakouneUtils.override {{ }}) buildKakounePluginFrom2Nix; inherit (kakouneUtils.override {{ }}) buildKakounePluginFrom2Nix;
generated = callPackage {ROOT}/generated.nix {{ generated = callPackage {ROOT}/generated.nix {{ inherit buildKakounePluginFrom2Nix; }};
inherit buildKakounePluginFrom2Nix;
}};
hasChecksum = hasChecksum =
value: value:
lib.isAttrs value lib.isAttrs value
@ -35,20 +32,23 @@ let
"src" "src"
"outputHash" "outputHash"
] value; ] value;
getChecksum =
name: value: parse = name: value: {{
if hasChecksum value then pname = value.pname;
{{ version = value.version;
submodules = value.src.fetchSubmodules or false; homePage = value.meta.homepage;
sha256 = value.src.outputHash; checksum =
rev = value.src.rev; if hasChecksum value then
}} {{
else submodules = value.src.fetchSubmodules or false;
null; sha256 = value.src.outputHash;
checksums = lib.mapAttrs getChecksum generated; rev = value.src.rev;
}}
else
null;
}};
in in
lib.filterAttrs (n: v: v != null) checksums lib.mapAttrs parse generated"""
)"""
HEADER = "# This file has been @generated by ./pkgs/applications/editors/kakoune/plugins/update.py. Do not edit!" HEADER = "# This file has been @generated by ./pkgs/applications/editors/kakoune/plugins/update.py. Do not edit!"
@ -56,7 +56,7 @@ HEADER = "# This file has been @generated by ./pkgs/applications/editors/kakoune
class KakouneEditor(pluginupdate.Editor): class KakouneEditor(pluginupdate.Editor):
def generate_nix( def generate_nix(
self, self,
plugins: List[Tuple[pluginupdate.PluginDesc, pluginupdate.Plugin]], plugins: list[tuple[pluginupdate.PluginDesc, pluginupdate.Plugin]],
outfile: str, outfile: str,
): ):
with open(outfile, "w+") as f: with open(outfile, "w+") as f:

View File

@ -6,6 +6,7 @@ let
generated = callPackage <localpkgs/pkgs/applications/editors/vim/plugins/generated.nix> { generated = callPackage <localpkgs/pkgs/applications/editors/vim/plugins/generated.nix> {
inherit buildNeovimPlugin buildVimPlugin; inherit buildNeovimPlugin buildVimPlugin;
} { } { }; } { } { };
hasChecksum = hasChecksum =
value: value:
lib.isAttrs value lib.isAttrs value
@ -13,16 +14,20 @@ let
"src" "src"
"outputHash" "outputHash"
] value; ] value;
getChecksum =
name: value: parse = name: value: {
if hasChecksum value then pname = value.pname;
{ version = value.version;
submodules = value.src.fetchSubmodules or false; homePage = value.meta.homepage;
sha256 = value.src.outputHash; checksum =
rev = value.src.rev; if hasChecksum value then
} {
else submodules = value.src.fetchSubmodules or false;
null; sha256 = value.src.outputHash;
checksums = lib.mapAttrs getChecksum generated; rev = value.src.rev;
}
else
null;
};
in in
lib.filterAttrs (n: v: v != null) checksums lib.mapAttrs parse generated

View File

@ -17,7 +17,6 @@ import textwrap
from dataclasses import dataclass from dataclasses import dataclass
from multiprocessing.dummy import Pool from multiprocessing.dummy import Pool
from pathlib import Path from pathlib import Path
from typing import List, Optional, Tuple
import pluginupdate import pluginupdate
from pluginupdate import FetchConfig, update_plugins from pluginupdate import FetchConfig, update_plugins
@ -49,18 +48,18 @@ class LuaPlugin:
"""Name of the plugin, as seen on luarocks.org""" """Name of the plugin, as seen on luarocks.org"""
rockspec: str rockspec: str
"""Full URI towards the rockspec""" """Full URI towards the rockspec"""
ref: Optional[str] ref: str | None
"""git reference (branch name/tag)""" """git reference (branch name/tag)"""
version: Optional[str] version: str | None
"""Set it to pin a package """ """Set it to pin a package """
server: Optional[str] server: str | None
"""luarocks.org registers packages under different manifests. """luarocks.org registers packages under different manifests.
Its value can be 'http://luarocks.org/dev' Its value can be 'http://luarocks.org/dev'
""" """
luaversion: Optional[str] luaversion: str | None
"""lua version if a package is available only for a specific lua version""" """lua version if a package is available only for a specific lua version"""
maintainers: Optional[str] maintainers: str | None
""" Optional string listing maintainers separated by spaces""" """Optional string listing maintainers separated by spaces"""
@property @property
def normalized_name(self) -> str: def normalized_name(self) -> str:
@ -77,7 +76,7 @@ class LuaEditor(pluginupdate.Editor):
def get_current_plugins(self): def get_current_plugins(self):
return [] return []
def load_plugin_spec(self, input_file) -> List[LuaPlugin]: def load_plugin_spec(self, input_file) -> list[LuaPlugin]:
luaPackages = [] luaPackages = []
csvfilename = input_file csvfilename = input_file
log.info("Loading package descriptions from %s", csvfilename) log.info("Loading package descriptions from %s", csvfilename)
@ -95,7 +94,7 @@ class LuaEditor(pluginupdate.Editor):
def update(self, args): def update(self, args):
update_plugins(self, args) update_plugins(self, args)
def generate_nix(self, results: List[Tuple[LuaPlugin, str]], outfilename: str): def generate_nix(self, results: list[tuple[LuaPlugin, str]], outfilename: str):
with tempfile.NamedTemporaryFile("w+") as f: with tempfile.NamedTemporaryFile("w+") as f:
f.write(HEADER) f.write(HEADER)
header2 = textwrap.dedent( header2 = textwrap.dedent(
@ -121,7 +120,16 @@ class LuaEditor(pluginupdate.Editor):
def attr_path(self): def attr_path(self):
return "luaPackages" return "luaPackages"
def get_update(self, input_file: str, outfile: str, config: FetchConfig): def get_update(
self,
input_file: str,
outfile: str,
config: FetchConfig,
# TODO: implement support for adding/updating individual plugins
to_update: list[str] | None,
):
if to_update is not None:
raise NotImplementedError("For now, lua updater doesn't support updating individual packages.")
_prefetch = generate_pkg_nix _prefetch = generate_pkg_nix
def update() -> dict: def update() -> dict: