diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml index 35e1d84..577a9e4 100644 --- a/.github/workflows/unittests.yml +++ b/.github/workflows/unittests.yml @@ -13,6 +13,9 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + - # Required for the package command tests to work + name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 - name: Set up Python uses: actions/setup-python@v5 with: diff --git a/.gitignore b/.gitignore index 2dc53ca..e605249 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,5 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ + +.DS_Store \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..f416a77 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Copy the code and install dependencies +COPY requirements.txt . +COPY setup.cfg . +COPY setup.py . +COPY cloudquery cloudquery +COPY main.py . +RUN pip3 install --no-cache-dir -r requirements.txt + +EXPOSE 7777 + +ENTRYPOINT ["python3", "main.py"] + +CMD ["serve", "--address", "[::]:7777", "--log-format", "json", "--log-level", "info"] \ No newline at end of file diff --git a/cloudquery/sdk/internal/memdb/memdb.py b/cloudquery/sdk/internal/memdb/memdb.py index 4f422f5..e9315fd 100644 --- a/cloudquery/sdk/internal/memdb/memdb.py +++ b/cloudquery/sdk/internal/memdb/memdb.py @@ -3,6 +3,7 @@ from cloudquery.sdk import schema from typing import List, Generator, Dict import pyarrow as pa +from cloudquery.sdk.types import JSONType NAME = "memdb" VERSION = "development" @@ -10,9 +11,67 @@ class MemDB(plugin.Plugin): def __init__(self) -> None: - super().__init__(NAME, VERSION) + super().__init__( + NAME, VERSION, opts=plugin.plugin.Options(team="cloudquery", kind="source") + ) self._db: Dict[str, pa.RecordBatch] = {} - self._tables: Dict[str, schema.Table] = {} + self._tables: Dict[str, schema.Table] = { + "table_1": schema.Table( + name="table_1", + columns=[ + schema.Column( + name="name", + type=pa.string(), + primary_key=True, + not_null=True, + unique=True, + ), + schema.Column( + name="id", + type=pa.string(), + primary_key=True, + not_null=True, + unique=True, + incremental_key=True, + ), + ], + title="Table 1", + description="Test Table 1", + is_incremental=True, + relations=[ + schema.Table( + name="table_1_relation_1", + columns=[ + schema.Column( + name="name", + type=pa.string(), + primary_key=True, + not_null=True, + unique=True, + ), + schema.Column(name="data", type=JSONType()), + ], + title="Table 1 Relation 1", + description="Test Table 1 Relation 1", + ) + ], + ), + "table_2": schema.Table( + name="table_2", + columns=[ + schema.Column( + name="name", + type=pa.string(), + primary_key=True, + not_null=True, + unique=True, + ), + schema.Column(name="id", type=pa.string()), + ], + title="Table 2", + description="Test Table 2", + ), + } def get_tables(self, options: plugin.TableOptions = None) -> List[plugin.Table]: tables = list(self._tables.values()) diff --git a/cloudquery/sdk/plugin/plugin.py b/cloudquery/sdk/plugin/plugin.py index 2462bc1..ff20cd6 100644 --- a/cloudquery/sdk/plugin/plugin.py +++ b/cloudquery/sdk/plugin/plugin.py @@ -29,10 +29,32 @@ class SyncOptions: backend_options: BackendOptions = None +@dataclass +class BuildTarget: + os: str = None + arch: str = None + + +@dataclass +class Options: + dockerfile: str = None + build_targets: List[BuildTarget] = None + team: str = None + kind: str = None + + class Plugin: - def __init__(self, name: str, version: str) -> None: + def __init__(self, name: str, version: str, opts: Options = None) -> None: self._name = name self._version = version + self._opts = Options() if opts is None else opts + if self._opts.dockerfile is None: + self._opts.dockerfile = "Dockerfile" + if self._opts.build_targets is None: + self._opts.build_targets = [ + BuildTarget("linux", "amd64"), + BuildTarget("linux", "arm64"), + ] def init(self, spec: bytes, no_connection: bool = False) -> None: pass @@ -46,6 +68,18 @@ def name(self) -> str: def version(self) -> str: return self._version + def team(self) -> str: + return self._opts.team + + def kind(self) -> str: + return self._opts.kind + + def dockerfile(self) -> str: + return self._opts.dockerfile + + def build_targets(self) -> List[BuildTarget]: + return self._opts.build_targets + def get_tables(self, options: TableOptions) -> List[Table]: raise NotImplementedError() diff --git a/cloudquery/sdk/schema/table.py b/cloudquery/sdk/schema/table.py index 0c740c8..89adb29 100644 --- a/cloudquery/sdk/schema/table.py +++ b/cloudquery/sdk/schema/table.py @@ -151,6 +151,8 @@ def filter_dfs_func(tt: List[Table], include, exclude, skip_dependent_tables: bo filtered_tables = [] for t in tt: filtered_table = copy.deepcopy(t) + for r in filtered_table.relations: + r.parent = filtered_table filtered_table = _filter_dfs_impl( filtered_table, False, include, exclude, skip_dependent_tables ) diff --git a/cloudquery/sdk/serve/plugin.py b/cloudquery/sdk/serve/plugin.py index 23f56cc..ebc59f7 100644 --- a/cloudquery/sdk/serve/plugin.py +++ b/cloudquery/sdk/serve/plugin.py @@ -1,7 +1,13 @@ import argparse +import hashlib +import json import logging import os +import shutil +import subprocess +import tarfile from concurrent import futures +from pathlib import Path import grpc import structlog @@ -9,10 +15,13 @@ from cloudquery.discovery_v1 import discovery_pb2_grpc from cloudquery.plugin_v3 import plugin_pb2_grpc from structlog import wrap_logger +from cloudquery.sdk import plugin + from cloudquery.sdk.internal.servers.discovery_v1.discovery import DiscoveryServicer from cloudquery.sdk.internal.servers.plugin_v3 import PluginServicer from cloudquery.sdk.plugin.plugin import Plugin +from cloudquery.sdk.schema import table _IS_WINDOWS = sys.platform == "win32" @@ -74,6 +83,14 @@ def get_logger(args): return log +def calc_sha256_checksum(filename: str): + with open(filename, "rb") as f: + file_hash = hashlib.sha256() + while chunk := f.read(32768): + file_hash.update(chunk) + return file_hash.hexdigest() + + class PluginCommand: def __init__(self, plugin: Plugin): self._plugin = plugin @@ -82,6 +99,20 @@ def run(self, args): parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest="command", required=True) + self._register_serve_command(subparsers) + self._register_package_command(subparsers) + + parsed_args = parser.parse_args(args) + + if parsed_args.command == "serve": + self._serve(parsed_args) + elif parsed_args.command == "package": + self._package(parsed_args) + else: + parser.print_help() + sys.exit(1) + + def _register_serve_command(self, subparsers): serve_parser = subparsers.add_parser("serve", help="Start plugin server") serve_parser.add_argument( "--log-format", @@ -97,7 +128,6 @@ def run(self, args): choices=["trace", "debug", "info", "warn", "error"], help="log level", ) - # ignored for now serve_parser.add_argument( "--no-sentry", @@ -118,7 +148,6 @@ def run(self, args): default="", help="Open Telemetry HTTP collector endpoint (for development only) (placeholder for future use)", ) - serve_parser.add_argument( "--address", type=str, @@ -133,13 +162,224 @@ def run(self, args): help="network to serve on. can be tcp or unix", ) - parsed_args = parser.parse_args(args) + def _register_package_command(self, subparsers): + package_parser = subparsers.add_parser( + "package", help="Package the plugin as a Docker image" + ) + package_parser.add_argument( + "version", help="version to tag the Docker image with" + ) + package_parser.add_argument("plugin-directory") + package_parser.add_argument( + "--log-format", + type=str, + default="text", + choices=["text", "json"], + help="logging format", + ) + package_parser.add_argument( + "--log-level", + type=str, + default="info", + choices=["trace", "debug", "info", "warn", "error"], + help="log level", + ) + package_parser.add_argument( + "-D", + "--dist-dir", + type=str, + help="dist directory to output the built plugin. (default: /dist)", + ) + package_parser.add_argument( + "--docs-dir", + type=str, + help="docs directory containing markdown files to copy to the dist directory. (default: /docs)", + ) + package_parser.add_argument( + "-m", + "--message", + type=str, + required=True, + help="message that summarizes what is new or changed in this version. Use @ to read from file. Supports markdown.", + ) - if parsed_args.command == "serve": - self._serve(parsed_args) - else: - parser.print_help() - sys.exit(1) + def _package(self, args): + logger = get_logger(args) + self._plugin.set_logger(logger) + + def _is_empty(val): + return val == None or len(val) == 0 + + if _is_empty(self._plugin.name()): + raise Exception("plugin name is required") + if _is_empty(self._plugin.team()): + raise Exception("plugin team is required") + if _is_empty(self._plugin.kind()): + raise Exception("plugin kind is required") + if _is_empty(self._plugin.dockerfile()): + raise Exception("plugin dockerfile is required") + if _is_empty(self._plugin.build_targets()): + raise Exception("at least one build target is required") + + plugin_directory, version, message = ( + getattr(args, "plugin-directory"), + getattr(args, "version"), + getattr(args, "message"), + ) + dist_dir = ( + "%s/dist" % plugin_directory if args.dist_dir == None else args.dist_dir + ) + docs_dir = ( + "%s/docs" % plugin_directory if args.docs_dir == None else args.docs_dir + ) + Path(dist_dir).mkdir(0o755, exist_ok=True, parents=True) + + self._copy_docs(logger, docs_dir, dist_dir) + self._write_tables_json(logger, dist_dir) + supported_targets = self._build_dockerfile( + logger, plugin_directory, dist_dir, version + ) + self._write_package_json(logger, dist_dir, message, version, supported_targets) + logger.info("Done packaging plugin to '%s'" % dist_dir) + + def _write_package_json(self, logger, dist_dir, message, version, supportedTargets): + package_json_path = "%s/package.json" % dist_dir + logger.info("Writing package.json to '%s'" % package_json_path) + content = { + "schema_version": 1, + "name": self._plugin.name(), + "team": self._plugin.team(), + "kind": self._plugin.kind(), + "version": version, + "message": message, + "protocols": [3], + "supported_targets": supportedTargets, + "package_type": "docker", + } + with open("%s/package.json" % dist_dir, "w") as f: + f.write(json.dumps(content, indent=2)) + + def _copy_docs(self, logger, docs_dir, dist_dir): + # check is docs_dir exists + if not os.path.isdir(docs_dir): + raise Exception("docs directory '%s' does not exist" % docs_dir) + + output_docs_dir = "%s/docs" % dist_dir + logger.info("Copying docs from '%s' to '%s'" % (docs_dir, output_docs_dir)) + shutil.copytree(docs_dir, output_docs_dir, dirs_exist_ok=True) + + def _write_tables_json(self, logger, dist_dir): + if self._plugin.kind() != "source": + return + + tables_json_output_path = "%s/tables.json" % dist_dir + logger.info("Writing tables to '%s'" % tables_json_output_path) + self._plugin.init(spec=b"", no_connection=True) + tables = self._plugin.get_tables( + options=plugin.plugin.TableOptions( + tables=["*"], skip_tables=[], skip_dependent_tables=False + ) + ) + flattened_tables = table.flatten_tables(tables) + + def column_to_json(column: table.Column): + return { + "name": column.name, + "type": str(column.type), + "description": column.description, + "incremental_key": column.incremental_key, + "primary_key": column.primary_key, + "not_null": column.not_null, + "unique": column.unique, + } + + def table_to_json(table: table.Table): + return { + "name": table.name, + "title": table.title, + "description": table.description, + "is_incremental": table.is_incremental, + "parent": table.parent.name if table.parent else "", + "relations": list(map(lambda r: r.name, table.relations)), + "columns": list(map(column_to_json, table.columns)), + } + + tables_json = list(map(table_to_json, flattened_tables)) + with open(tables_json_output_path, "w") as f: + f.write(json.dumps(tables_json)) + logger.info( + "Wrote %d tables to '%s'" % (len(tables_json), tables_json_output_path) + ) + + def _build_dockerfile(self, logger, plugin_dir, dist_dir, version): + dockerfile_path = "%s/%s" % (plugin_dir, self._plugin.dockerfile()) + if not os.path.isfile(dockerfile_path): + raise Exception("Dockerfile '%s' does not exist" % dockerfile_path) + + def run_docker_cmd(cmd, plugin_dir): + result = subprocess.run(cmd, capture_output=True, cwd=plugin_dir) + if result.returncode != 0: + err = ( + "" + if result.stderr is None + else result.stderr.decode("ascii").strip() + ) + raise ChildProcessError("Unable to run Docker command: %s" % err) + + def build_target(target: plugin.plugin.BuildTarget): + image_repository = "registry.cloudquery.io/%s/%s-%s" % ( + self._plugin.team(), + self._plugin.kind(), + self._plugin.name(), + ) + image_tag = "%s:%s-%s-%s" % ( + image_repository, + version, + target.os, + target.arch, + ) + image_tar = "plugin-%s-%s-%s-%s.tar" % ( + self._plugin.name(), + version, + target.os, + target.arch, + ) + image_path = "%s/%s" % (dist_dir, image_tar) + logger.info("Building docker image %s" % image_tag) + docker_build_arguments = [ + "docker", + "buildx", + "build", + "-t", + image_tag, + "--platform", + "%s/%s" % (target.os, target.arch), + "-f", + dockerfile_path, + ".", + "--progress", + "plain", + "--load", + ] + logger.debug( + "Running command 'docker %s'" % " ".join(docker_build_arguments) + ) + run_docker_cmd(docker_build_arguments, plugin_dir) + logger.debug("Saving docker image '%s' to '%s'" % (image_tag, image_path)) + docker_save_arguments = ["docker", "save", "-o", image_path, image_tag] + logger.debug("Running command 'docker %s'", " ".join(docker_save_arguments)) + run_docker_cmd(docker_save_arguments, plugin_dir) + return { + "os": target.os, + "arch": target.arch, + "path": image_tar, + "checksum": calc_sha256_checksum(image_path), + "docker_image_tag": image_tag, + } + + logger.info("Building %d targets" % len(self._plugin.build_targets())) + supported_targets = list(map(build_target, self._plugin.build_targets())) + return supported_targets def _serve(self, args): logger = get_logger(args) diff --git a/cloudquery/sdk/types/json.py b/cloudquery/sdk/types/json.py index 28bf3bb..ed9ed1f 100644 --- a/cloudquery/sdk/types/json.py +++ b/cloudquery/sdk/types/json.py @@ -13,6 +13,9 @@ def __arrow_ext_serialize__(self): # metadata to be deserialized return b"json-serialized" + def __str__(self): + return "json" + @classmethod def __arrow_ext_deserialize__(self, storage_type, serialized): # return an instance of this subclass given the serialized diff --git a/cloudquery/sdk/types/uuid.py b/cloudquery/sdk/types/uuid.py index a9ef571..d3564a9 100644 --- a/cloudquery/sdk/types/uuid.py +++ b/cloudquery/sdk/types/uuid.py @@ -15,6 +15,9 @@ def __arrow_ext_serialize__(self): # metadata to be deserialized return b"uuid-serialized" + def __str__(self): + return "uuid" + @classmethod def __arrow_ext_deserialize__(self, storage_type, serialized): # return an instance of this subclass given the serialized diff --git a/docs/overview.md b/docs/overview.md new file mode 100644 index 0000000..027ec10 --- /dev/null +++ b/docs/overview.md @@ -0,0 +1,3 @@ +# MemDB Plugin + +Test docs for the MemDB plugin. diff --git a/main.py b/main.py new file mode 100644 index 0000000..06e81ce --- /dev/null +++ b/main.py @@ -0,0 +1,13 @@ +import sys +from cloudquery.sdk import serve + +from cloudquery.sdk.internal.memdb import MemDB + + +def main(): + p = MemDB() + serve.PluginCommand(p).run(sys.argv[1:]) + + +if __name__ == "__main__": + main() diff --git a/tests/serve/plugin.py b/tests/serve/plugin.py index 26c212c..6717b41 100644 --- a/tests/serve/plugin.py +++ b/tests/serve/plugin.py @@ -1,3 +1,5 @@ +import json +import os import random import grpc import time @@ -58,7 +60,7 @@ def writer_iterator(): response = stub.GetTables(plugin_pb2.GetTables.Request(tables=["*"])) schemas = arrow.new_schemas_from_bytes(response.tables) - assert len(schemas) == 1 + assert len(schemas) == 4 response = stub.Sync(plugin_pb2.Sync.Request(tables=["*"])) total_records = 0 @@ -70,3 +72,134 @@ def writer_iterator(): finally: cmd.stop() pool.shutdown() + + +def test_plugin_package(): + p = MemDB() + cmd = serve.PluginCommand(p) + cmd.run(["package", "-m", "test", "v1.0.0", "."]) + assert os.path.isfile("dist/tables.json") + assert os.path.isfile("dist/package.json") + assert os.path.isfile("dist/docs/overview.md") + assert os.path.isfile("dist/plugin-memdb-v1.0.0-linux-amd64.tar") + assert os.path.isfile("dist/plugin-memdb-v1.0.0-linux-arm64.tar") + + with open("dist/tables.json", "r") as f: + tables = json.loads(f.read()) + assert tables == [ + { + "name": "table_1", + "title": "Table 1", + "description": "Test Table 1", + "is_incremental": True, + "parent": "", + "relations": ["table_1_relation_1"], + "columns": [ + { + "name": "name", + "type": "string", + "description": "", + "incremental_key": False, + "primary_key": True, + "not_null": True, + "unique": True, + }, + { + "name": "id", + "type": "string", + "description": "", + "incremental_key": True, + "primary_key": True, + "not_null": True, + "unique": True, + }, + ], + }, + { + "name": "table_1_relation_1", + "title": "Table 1 Relation 1", + "description": "Test Table 1 Relation 1", + "is_incremental": False, + "parent": "table_1", + "relations": [], + "columns": [ + { + "name": "name", + "type": "string", + "description": "", + "incremental_key": False, + "primary_key": True, + "not_null": True, + "unique": True, + }, + { + "name": "data", + "type": "json", + "description": "", + "incremental_key": False, + "primary_key": False, + "not_null": False, + "unique": False, + }, + ], + }, + { + "name": "table_2", + "title": "Table 2", + "description": "Test Table 2", + "is_incremental": False, + "parent": "", + "relations": [], + "columns": [ + { + "name": "name", + "type": "string", + "description": "", + "incremental_key": False, + "primary_key": True, + "not_null": True, + "unique": True, + }, + { + "name": "id", + "type": "string", + "description": "", + "incremental_key": False, + "primary_key": False, + "not_null": False, + "unique": False, + }, + ], + }, + ] + with open("dist/package.json", "r") as f: + package = json.loads(f.read()) + assert package["schema_version"] == 1 + assert package["name"] == "memdb" + assert package["version"] == "v1.0.0" + assert package["team"] == "cloudquery" + assert package["kind"] == "source" + assert package["message"] == "test" + assert package["protocols"] == [3] + assert len(package["supported_targets"]) == 2 + assert package["package_type"] == "docker" + assert package["supported_targets"][0]["os"] == "linux" + assert package["supported_targets"][0]["arch"] == "amd64" + assert ( + package["supported_targets"][0]["path"] + == "plugin-memdb-v1.0.0-linux-amd64.tar" + ) + assert ( + package["supported_targets"][0]["docker_image_tag"] + == "registry.cloudquery.io/cloudquery/source-memdb:v1.0.0-linux-amd64" + ) + assert package["supported_targets"][1]["os"] == "linux" + assert package["supported_targets"][1]["arch"] == "arm64" + assert ( + package["supported_targets"][1]["path"] + == "plugin-memdb-v1.0.0-linux-arm64.tar" + ) + assert ( + package["supported_targets"][1]["docker_image_tag"] + == "registry.cloudquery.io/cloudquery/source-memdb:v1.0.0-linux-arm64" + )