From 681f0786724c31c46771fc5e3b50100ad9734ff0 Mon Sep 17 00:00:00 2001 From: barslev Date: Tue, 12 May 2026 11:06:33 +0200 Subject: [PATCH 1/5] feat: stream CLI logs to /python-cli-runs/* lifecycle endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Buffer the CLI's own log records and POST them in 5s batches to a new register/upload/finalize lifecycle so the admin dashboard renders what the user saw in their terminal alongside the run's terminal status. New modules: - core/cli_run.py — register_cli_run / finalize_cli_run helpers - core/log_uploader.py — BatchedLogUploader (daemon-thread flusher, chunked under the 256KB cap, swallows network errors, drains on shutdown) and UploadingLogHandler routing log records to it - core/streaming.py — setup_streaming() wires both into the socketcli and socketdev loggers, forces them to DEBUG so uploads capture the full history regardless of local terminal verbosity, and returns a teardown callable for the caller to register with atexit - set_run_status() propagates the terminal status through the teardown; socketcli.py exception handlers call it for KeyboardInterrupt (cancelled), uncaught Exception (failure), and any SystemExit with a non-zero code (failure) so sys.exit() paths inside main_code surface correctly instead of defaulting to success Best-effort end-to-end: registration failures fall back to no-streaming and never block the scan. Opt out with --disable-server-log-streaming. Tested against local depscan with the matching /v0/python-cli-runs/* endpoints; 173 unit tests pass. --- socketsecurity/config.py | 14 ++ socketsecurity/core/cli_run.py | 66 ++++++++++ socketsecurity/core/log_uploader.py | 150 +++++++++++++++++++++ socketsecurity/core/streaming.py | 79 ++++++++++++ socketsecurity/socketcli.py | 21 +++ tests/unit/test_cli_run.py | 76 +++++++++++ tests/unit/test_log_uploader.py | 193 ++++++++++++++++++++++++++++ tests/unit/test_streaming.py | 117 +++++++++++++++++ 8 files changed, 716 insertions(+) create mode 100644 socketsecurity/core/cli_run.py create mode 100644 socketsecurity/core/log_uploader.py create mode 100644 socketsecurity/core/streaming.py create mode 100644 tests/unit/test_cli_run.py create mode 100644 tests/unit/test_log_uploader.py create mode 100644 tests/unit/test_streaming.py diff --git a/socketsecurity/config.py b/socketsecurity/config.py index 1d18c6a..c344a12 100644 --- a/socketsecurity/config.py +++ b/socketsecurity/config.py @@ -92,6 +92,7 @@ class CliConfig: ignore_commit_files: bool = False disable_blocking: bool = False disable_ignore: bool = False + disable_server_log_streaming: bool = False strict_blocking: bool = False integration_type: IntegrationType = "api" integration_org_slug: Optional[str] = None @@ -207,6 +208,7 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig': 'ignore_commit_files': args.ignore_commit_files, 'disable_blocking': args.disable_blocking, 'disable_ignore': args.disable_ignore, + 'disable_server_log_streaming': args.disable_server_log_streaming, 'strict_blocking': args.strict_blocking, 'integration_type': args.integration, 'pending_head': args.pending_head, @@ -716,6 +718,18 @@ def create_argument_parser() -> argparse.ArgumentParser: action="store_true", help=argparse.SUPPRESS ) + advanced_group.add_argument( + "--disable-server-log-streaming", + dest="disable_server_log_streaming", + action="store_true", + help="Disable streaming server-side log lines to the terminal during long-running CLI operations." + ) + advanced_group.add_argument( + "--disable_server_log_streaming", + dest="disable_server_log_streaming", + action="store_true", + help=argparse.SUPPRESS + ) advanced_group.add_argument( "--strict-blocking", dest="strict_blocking", diff --git a/socketsecurity/core/cli_run.py b/socketsecurity/core/cli_run.py new file mode 100644 index 0000000..205e812 --- /dev/null +++ b/socketsecurity/core/cli_run.py @@ -0,0 +1,66 @@ +"""Lifecycle helpers for a CLI run on the Socket backend. + +A "run" represents a single CLI invocation. `register_cli_run` opens it and +returns a server-issued `run_id`; `finalize_cli_run` closes it on exit. The +run_id keys the rows that `BatchedLogUploader` POSTs to +`/python-cli-runs//logs` during the run so the dashboard can show +what the user saw in their terminal. + +Both calls are best-effort: failures fall back to no-streaming and never +prevent the scan from running. +""" + +import json +import logging +from typing import Optional + +from .cli_client import CliClient +from .exceptions import APIFailure + +log = logging.getLogger("socketcli") + + +def register_cli_run( + client: CliClient, + client_version: str, + integration: Optional[str] = None, +) -> Optional[str]: + payload = {"client_version": client_version} + if integration: + payload["integration"] = integration + try: + resp = client.request( + path="python-cli-runs", + method="POST", + payload=json.dumps(payload), + ) + except APIFailure as e: + log.debug(f"cli-run register failed (streaming disabled): {e}") + return None + + try: + body = resp.json() + except (ValueError, json.JSONDecodeError) as e: + log.debug(f"cli-run register: bad JSON body: {e}") + return None + + run_id = body.get("run_id") + if not isinstance(run_id, str) or not run_id: + log.debug(f"cli-run register: missing run_id in response: {body!r}") + return None + return run_id + + +def finalize_cli_run( + client: CliClient, + run_id: str, + status: str = "success", +) -> None: + try: + client.request( + path=f"python-cli-runs/{run_id}/finalize", + method="POST", + payload=json.dumps({"status": status}), + ) + except Exception as e: + log.debug(f"cli-run finalize failed (swallowed): {e}") diff --git a/socketsecurity/core/log_uploader.py b/socketsecurity/core/log_uploader.py new file mode 100644 index 0000000..cbc2c4c --- /dev/null +++ b/socketsecurity/core/log_uploader.py @@ -0,0 +1,150 @@ +"""Buffer the CLI's local log records and POST them in batches to +/python-cli-runs//logs so the dashboard's view of a CLI run +mirrors what the user sees in their terminal. + +Behavior: +- daemon thread, 5s flush +- swallow all network errors (debug log only) +- skip empty buffers +- drain on shutdown +- at-most-once semantics (failed batches dropped, not retried) + +A thread-local recursion guard prevents the uploader's own request-error +log lines (emitted by `cli_client.py`'s `socketdev` logger) from being +re-enqueued during a flush. +""" + +import json +import logging +import threading +from datetime import datetime, timezone +from typing import Optional + +from .cli_client import CliClient + +log = logging.getLogger(__name__) + +_FLUSH_GUARD = threading.local() + +_MAX_BATCH_BYTES = 256 * 1024 - 1024 # depscan body cap is 256KB; reserve headroom for envelope/headers + +_LEVEL_MAP = { + logging.DEBUG: "DEBUG", + logging.INFO: "INFO", + logging.WARNING: "WARN", + logging.ERROR: "ERROR", + logging.CRITICAL: "ERROR", +} + + +def _now_str() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + +class BatchedLogUploader: + def __init__( + self, + client: CliClient, + run_id: str, + flush_interval: float = 5.0, + ): + self._client = client + self._run_id = run_id + self._flush_interval = flush_interval + self._buf: list = [] + self._lock = threading.Lock() + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + + def add(self, entry: dict) -> None: + with self._lock: + self._buf.append(entry) + + def start(self) -> None: + if self._thread is not None: + return + self._thread = threading.Thread( + target=self._run, + name=f"socket-log-uploader-{self._run_id[:8]}", + daemon=True, + ) + self._thread.start() + + def stop(self, timeout: float = 2.0) -> None: + if self._thread is None: + self._flush() + return + self._stop.set() + self._thread.join(timeout=timeout) + self._thread = None + self._flush() + + def _run(self) -> None: + while not self._stop.is_set(): + self._flush() + self._stop.wait(self._flush_interval) + + def _flush(self) -> None: + with self._lock: + if not self._buf: + return + entries = self._buf + self._buf = [] + + _FLUSH_GUARD.active = True + try: + for chunk in _chunk_by_size(entries): + try: + self._client.request( + path=f"python-cli-runs/{self._run_id}/logs", + method="POST", + payload=json.dumps({"logs": chunk}), + ) + except Exception as e: + log.debug(f"log upload failed (swallowed, {len(chunk)} entries dropped): {e}") + finally: + _FLUSH_GUARD.active = False + + +def _chunk_by_size(entries: list) -> list: + """Split entries into chunks that each serialize to <= _MAX_BATCH_BYTES. + Single entries that exceed the cap are dropped with a debug log.""" + chunks: list = [] + current: list = [] + envelope = len('{"logs":[]}') + current_size = envelope + for entry in entries: + entry_size = len(json.dumps(entry)) + 1 # +1 for inter-entry comma + if entry_size + envelope > _MAX_BATCH_BYTES: + log.debug(f"log entry too large ({entry_size}B), dropped") + continue + if current and current_size + entry_size > _MAX_BATCH_BYTES: + chunks.append(current) + current = [entry] + current_size = envelope + entry_size + else: + current.append(entry) + current_size += entry_size + if current: + chunks.append(current) + return chunks + + +class UploadingLogHandler(logging.Handler): + def __init__(self, uploader: BatchedLogUploader, context: str = "socket-python-cli"): + super().__init__() + self._uploader = uploader + self._context = context + + def emit(self, record: logging.LogRecord) -> None: + if getattr(_FLUSH_GUARD, "active", False): + return + try: + self._uploader.add({ + "timestamp": _now_str(), + "level": _LEVEL_MAP.get(record.levelno, "INFO"), + "message": self.format(record), + "context": self._context, + }) + except Exception: + self.handleError(record) diff --git a/socketsecurity/core/streaming.py b/socketsecurity/core/streaming.py new file mode 100644 index 0000000..cd6ede9 --- /dev/null +++ b/socketsecurity/core/streaming.py @@ -0,0 +1,79 @@ +"""Wire the server log streaming pipeline for one CLI run. + +`setup_streaming` registers the run with the backend, attaches handlers that +route the CLI's own log output through both the local terminal and a batched +uploader, and forces the loggers into DEBUG so the upload captures everything +regardless of local terminal verbosity. + +Returns a teardown callable to invoke on exit (typically via `atexit.register`). +Returns None if registration failed; in that case nothing was wired up. +""" + +import logging +from typing import Callable, Optional + +from .cli_client import CliClient +from .cli_run import finalize_cli_run, register_cli_run +from .log_uploader import BatchedLogUploader, UploadingLogHandler + +_run_status: str = "success" + + +def set_run_status(status: str) -> None: + global _run_status + _run_status = status + + +def setup_streaming( + *, + client: CliClient, + cli_logger: logging.Logger, + sdk_logger: logging.Logger, + client_version: str, + integration: Optional[str], + enable_debug: bool, +) -> Optional[Callable[[], None]]: + run_id = register_cli_run( + client, + client_version=client_version, + integration=integration, + ) + if not run_id: + cli_logger.debug("server log streaming disabled (register failed)") + return None + + log_uploader = BatchedLogUploader(client, run_id) + log_uploader.start() + upload_handler = UploadingLogHandler(log_uploader, context="socket-python-cli") + upload_handler.setFormatter(logging.Formatter("%(message)s")) + + terminal_handler = logging.StreamHandler() + terminal_handler.setLevel(logging.DEBUG if enable_debug else logging.INFO) + terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s")) + + saved_levels = (cli_logger.level, sdk_logger.level) + saved_propagate = (cli_logger.propagate, sdk_logger.propagate) + cli_logger.setLevel(logging.DEBUG) + sdk_logger.setLevel(logging.DEBUG) + cli_logger.propagate = False + sdk_logger.propagate = False + cli_logger.addHandler(terminal_handler) + sdk_logger.addHandler(terminal_handler) + cli_logger.addHandler(upload_handler) + sdk_logger.addHandler(upload_handler) + + cli_logger.debug(f"server log streaming enabled (run_id={run_id})") + + def teardown() -> None: + cli_logger.removeHandler(upload_handler) + sdk_logger.removeHandler(upload_handler) + log_uploader.stop() + finalize_cli_run(client, run_id, status=_run_status) + cli_logger.removeHandler(terminal_handler) + sdk_logger.removeHandler(terminal_handler) + cli_logger.setLevel(saved_levels[0]) + sdk_logger.setLevel(saved_levels[1]) + cli_logger.propagate = saved_propagate[0] + sdk_logger.propagate = saved_propagate[1] + + return teardown diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index 1f2b166..ea5e73e 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -1,3 +1,4 @@ +import atexit import json import os import sys @@ -20,6 +21,7 @@ from socketsecurity.core.messages import Messages from socketsecurity.core.scm_comments import Comments from socketsecurity.core.socket_config import SocketConfig +from socketsecurity.core.streaming import set_run_status, setup_streaming from socketsecurity.output import OutputHandler socket_logger, log = initialize_logging() @@ -30,13 +32,19 @@ def cli(): try: main_code() except KeyboardInterrupt: + set_run_status("cancelled") log.info("Keyboard Interrupt detected, exiting") config = CliConfig.from_args() # Get current config if not config.disable_blocking: sys.exit(2) else: sys.exit(0) + except SystemExit as e: + if e.code: + set_run_status("failure") + raise except Exception as error: + set_run_status("failure") log.error("Unexpected error when running the cli") log.error(error) traceback.print_exc() @@ -89,6 +97,19 @@ def main_code(): client = CliClient(socket_config) sdk.api.api_url = socket_config.api_url log.debug("loaded client") + + if not config.disable_server_log_streaming: + teardown = setup_streaming( + client=client, + cli_logger=log, + sdk_logger=socket_logger, + client_version=config.version, + integration=config.integration_type, + enable_debug=config.enable_debug, + ) + if teardown: + atexit.register(teardown) + core = Core(socket_config, sdk, config) log.debug("loaded core") diff --git a/tests/unit/test_cli_run.py b/tests/unit/test_cli_run.py new file mode 100644 index 0000000..1c171bd --- /dev/null +++ b/tests/unit/test_cli_run.py @@ -0,0 +1,76 @@ +import json +from unittest.mock import Mock + +from socketsecurity.core.cli_client import CliClient +from socketsecurity.core.cli_run import finalize_cli_run, register_cli_run +from socketsecurity.core.exceptions import APIFailure + + +def _resp(payload): + r = Mock() + r.json.return_value = payload + return r + + +def test_register_cli_run_returns_run_id(): + client = Mock(spec=CliClient) + client.request.return_value = _resp({"run_id": "srv-issued-123"}) + + run_id = register_cli_run(client, client_version="1.2.3", integration="github") + + assert run_id == "srv-issued-123" + args, kwargs = client.request.call_args + assert kwargs["path"] == "python-cli-runs" + assert kwargs["method"] == "POST" + body = json.loads(kwargs["payload"]) + assert body == {"client_version": "1.2.3", "integration": "github"} + + +def test_register_cli_run_returns_none_on_api_failure(): + client = Mock(spec=CliClient) + client.request.side_effect = APIFailure("network down") + + assert register_cli_run(client, client_version="1.0.0") is None + + +def test_register_cli_run_returns_none_on_missing_run_id(): + client = Mock(spec=CliClient) + client.request.return_value = _resp({}) + + assert register_cli_run(client, client_version="1.0.0") is None + + +def test_register_cli_run_returns_none_on_bad_json(): + bad = Mock() + bad.json.side_effect = ValueError("not json") + client = Mock(spec=CliClient) + client.request.return_value = bad + + assert register_cli_run(client, client_version="1.0.0") is None + + +def test_register_cli_run_omits_integration_when_falsy(): + client = Mock(spec=CliClient) + client.request.return_value = _resp({"run_id": "x"}) + + register_cli_run(client, client_version="1.0.0", integration=None) + + body = json.loads(client.request.call_args.kwargs["payload"]) + assert body == {"client_version": "1.0.0"} + + +def test_finalize_cli_run_posts_status(): + client = Mock(spec=CliClient) + finalize_cli_run(client, "run-x", status="failure") + + args, kwargs = client.request.call_args + assert kwargs["path"] == "python-cli-runs/run-x/finalize" + assert kwargs["method"] == "POST" + assert json.loads(kwargs["payload"]) == {"status": "failure"} + + +def test_finalize_cli_run_swallows_errors(): + client = Mock(spec=CliClient) + client.request.side_effect = APIFailure("network down") + + finalize_cli_run(client, "run-x") # must not raise diff --git a/tests/unit/test_log_uploader.py b/tests/unit/test_log_uploader.py new file mode 100644 index 0000000..331b885 --- /dev/null +++ b/tests/unit/test_log_uploader.py @@ -0,0 +1,193 @@ +import json +import logging +import time +from unittest.mock import Mock + +import pytest + +from socketsecurity.core.cli_client import CliClient +from socketsecurity.core.exceptions import APIFailure +from socketsecurity.core.log_uploader import ( + _MAX_BATCH_BYTES, + BatchedLogUploader, + UploadingLogHandler, + _chunk_by_size, +) +from socketsecurity.core.socket_config import SocketConfig + + +@pytest.fixture +def config(): + return SocketConfig(api_key="k", timeout=30) + + +def test_add_buffers_until_flush(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-x", flush_interval=10) + u.add({"timestamp": "t", "level": "INFO", "message": "a", "context": "c"}) + u.add({"timestamp": "t", "level": "INFO", "message": "b", "context": "c"}) + client.request.assert_not_called() + assert len(u._buf) == 2 + + +def test_flush_posts_batch_and_clears_buffer(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-y", flush_interval=10) + u.add({"timestamp": "t", "level": "INFO", "message": "a", "context": "c"}) + u.add({"timestamp": "t", "level": "WARN", "message": "b", "context": "c"}) + + u._flush() + + args, kwargs = client.request.call_args + assert kwargs["path"] == "python-cli-runs/run-y/logs" + assert kwargs["method"] == "POST" + body = json.loads(kwargs["payload"]) + assert len(body["logs"]) == 2 + assert body["logs"][0]["message"] == "a" + assert u._buf == [] + + +def test_flush_skips_empty_buffer(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-z", flush_interval=10) + u._flush() + client.request.assert_not_called() + + +def test_flush_swallows_api_failure_and_drops_batch(): + client = Mock(spec=CliClient) + client.request.side_effect = APIFailure("net down") + u = BatchedLogUploader(client, "run-e", flush_interval=10) + u.add({"timestamp": "t", "level": "INFO", "message": "x", "context": "c"}) + + u._flush() # must not raise + assert u._buf == [] # batch is dropped, not retried + + +def test_stop_drains_remaining_buffer(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-d", flush_interval=10) + u.start() + u.add({"timestamp": "t", "level": "INFO", "message": "tail", "context": "c"}) + u.stop(timeout=2.0) + + assert client.request.called + body = json.loads(client.request.call_args.kwargs["payload"]) + assert body["logs"][-1]["message"] == "tail" + + +def test_handler_emit_enqueues_record(caplog): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-h", flush_interval=10) + h = UploadingLogHandler(u) + + rec = logging.LogRecord( + name="socketcli", level=logging.WARNING, pathname=__file__, + lineno=1, msg="watch out", args=(), exc_info=None, + ) + h.emit(rec) + + assert len(u._buf) == 1 + e = u._buf[0] + assert e["level"] == "WARN" + assert e["message"] == "watch out" + assert e["context"] == "socket-python-cli" + + +def test_handler_skips_during_active_flush(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-g", flush_interval=10) + h = UploadingLogHandler(u) + + captured = {} + + def fake_request(**kwargs): + rec = logging.LogRecord( + name="socketdev", level=logging.ERROR, pathname=__file__, + lineno=1, msg="recursive!", args=(), exc_info=None, + ) + h.emit(rec) + captured["buf_len_during_flush"] = len(u._buf) + return Mock() + + client.request.side_effect = fake_request + u.add({"timestamp": "t", "level": "INFO", "message": "real", "context": "c"}) + u._flush() + + assert captured["buf_len_during_flush"] == 0 # recursive emit was skipped + assert u._buf == [] + + +def test_levels_map_correctly(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-l", flush_interval=10) + h = UploadingLogHandler(u) + + for py_level, expected in [ + (logging.DEBUG, "DEBUG"), + (logging.INFO, "INFO"), + (logging.WARNING, "WARN"), + (logging.ERROR, "ERROR"), + (logging.CRITICAL, "ERROR"), + ]: + rec = logging.LogRecord( + name="t", level=py_level, pathname=__file__, + lineno=1, msg="m", args=(), exc_info=None, + ) + h.emit(rec) + + levels = [e["level"] for e in u._buf] + assert levels == ["DEBUG", "INFO", "WARN", "ERROR", "ERROR"] + + +def test_chunk_by_size_keeps_small_batches_intact(): + entries = [{"timestamp": "t", "level": "INFO", "message": "x", "context": "c"}] * 5 + chunks = _chunk_by_size(entries) + assert len(chunks) == 1 + assert chunks[0] == entries + + +def test_chunk_by_size_splits_when_exceeding_cap(): + big_msg = "y" * 1000 + entries = [ + {"timestamp": "2026-05-07 22:30:00.000", "level": "INFO", + "message": big_msg, "context": "c"} + for _ in range(500) + ] + chunks = _chunk_by_size(entries) + assert len(chunks) >= 2 + for chunk in chunks: + size = len(json.dumps({"logs": chunk})) + assert size <= _MAX_BATCH_BYTES + assert sum(len(c) for c in chunks) == len(entries) + + +def test_chunk_by_size_drops_single_oversize_entry(): + too_big = {"timestamp": "t", "level": "INFO", + "message": "z" * (_MAX_BATCH_BYTES + 100), "context": "c"} + ok = {"timestamp": "t", "level": "INFO", "message": "ok", "context": "c"} + chunks = _chunk_by_size([ok, too_big, ok]) + flat = [e for c in chunks for e in c] + assert flat == [ok, ok] # too_big dropped, smalls preserved + + +def test_flush_chunks_oversize_buffer_into_multiple_posts(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-c", flush_interval=10) + big_msg = "y" * 1000 + for _ in range(500): + u.add({"timestamp": "2026-05-07 22:30:00.000", "level": "INFO", + "message": big_msg, "context": "c"}) + + u._flush() + assert client.request.call_count >= 2 # split into multiple POSTs + + +def test_run_thread_flushes_periodically_then_exits(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-t", flush_interval=0.05) + u.add({"timestamp": "t", "level": "INFO", "message": "first", "context": "c"}) + u.start() + time.sleep(0.2) # allow at least one flush tick + u.stop(timeout=1.0) + assert client.request.called diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py new file mode 100644 index 0000000..c20334d --- /dev/null +++ b/tests/unit/test_streaming.py @@ -0,0 +1,117 @@ +import logging +from unittest.mock import patch + +import pytest + +import socketsecurity.core.streaming as streaming_mod +from socketsecurity.core.streaming import set_run_status, setup_streaming + + +@pytest.fixture(autouse=True) +def reset_run_status(): + streaming_mod._run_status = "success" + yield + streaming_mod._run_status = "success" + + +def test_setup_streaming_returns_none_when_register_fails(): + with patch("socketsecurity.core.streaming.register_cli_run", return_value=None): + teardown = setup_streaming( + client=object(), + cli_logger=logging.getLogger("t-fail"), + sdk_logger=logging.getLogger("t-fail-sdk"), + client_version="1.0", + integration=None, + enable_debug=False, + ) + assert teardown is None + + +def test_teardown_finalizes_with_current_run_status(): + cli_logger = logging.getLogger("t-finalize-cli") + sdk_logger = logging.getLogger("t-finalize-sdk") + + finalize_calls = [] + + def fake_finalize(client, run_id, status="success"): + finalize_calls.append(status) + + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-1"), \ + patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ + patch.object(streaming_mod.BatchedLogUploader, "start"), \ + patch.object(streaming_mod.BatchedLogUploader, "stop"): + teardown = setup_streaming( + client=object(), + cli_logger=cli_logger, + sdk_logger=sdk_logger, + client_version="1.0", + integration=None, + enable_debug=False, + ) + assert teardown is not None + + set_run_status("failure") + teardown() + + assert finalize_calls == ["failure"] + + +def test_set_run_status_default_is_success(): + cli_logger = logging.getLogger("t-default-cli") + sdk_logger = logging.getLogger("t-default-sdk") + + finalize_calls = [] + + def fake_finalize(client, run_id, status="success"): + finalize_calls.append(status) + + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-2"), \ + patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ + patch.object(streaming_mod.BatchedLogUploader, "start"), \ + patch.object(streaming_mod.BatchedLogUploader, "stop"): + teardown = setup_streaming( + client=object(), + cli_logger=cli_logger, + sdk_logger=sdk_logger, + client_version="1.0", + integration=None, + enable_debug=False, + ) + teardown() + + assert finalize_calls == ["success"] + + +def test_setup_streaming_restores_logger_state_on_teardown(): + cli_logger = logging.getLogger("t-restore-cli") + sdk_logger = logging.getLogger("t-restore-sdk") + cli_logger.setLevel(logging.WARNING) + sdk_logger.setLevel(logging.ERROR) + cli_logger.propagate = True + sdk_logger.propagate = True + handler_count_before = (len(cli_logger.handlers), len(sdk_logger.handlers)) + + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-3"), \ + patch("socketsecurity.core.streaming.finalize_cli_run"), \ + patch.object(streaming_mod.BatchedLogUploader, "start"), \ + patch.object(streaming_mod.BatchedLogUploader, "stop"): + teardown = setup_streaming( + client=object(), + cli_logger=cli_logger, + sdk_logger=sdk_logger, + client_version="1.0", + integration=None, + enable_debug=False, + ) + # During streaming: levels and propagate are forced + assert cli_logger.level == logging.DEBUG + assert sdk_logger.level == logging.DEBUG + assert cli_logger.propagate is False + assert sdk_logger.propagate is False + teardown() + + assert cli_logger.level == logging.WARNING + assert sdk_logger.level == logging.ERROR + assert cli_logger.propagate is True + assert sdk_logger.propagate is True + assert (len(cli_logger.handlers), len(sdk_logger.handlers)) == handler_count_before From b07e6213df95455ce058528e5fa2971c2b5e00ec Mon Sep 17 00:00:00 2001 From: barslev Date: Thu, 14 May 2026 16:53:55 +0200 Subject: [PATCH 2/5] chore: drop per-batch size chunking to match upstream uploader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 256 KB ceiling I added speculatively when the server cap was 256 KB no longer matches the reference implementation we're mirroring, which sends each flush as a single POST regardless of size. With the server cap now well above any plausible single-flush volume, chunking is unnecessary and divergent — drop it. Removes _chunk_by_size, _MAX_BATCH_BYTES, and the four chunking tests. _flush now POSTs the entire buffered batch as one request. --- socketsecurity/core/log_uploader.py | 44 +++++----------------------- tests/unit/test_log_uploader.py | 45 ----------------------------- 2 files changed, 8 insertions(+), 81 deletions(-) diff --git a/socketsecurity/core/log_uploader.py b/socketsecurity/core/log_uploader.py index cbc2c4c..8c4c6f3 100644 --- a/socketsecurity/core/log_uploader.py +++ b/socketsecurity/core/log_uploader.py @@ -26,8 +26,6 @@ _FLUSH_GUARD = threading.local() -_MAX_BATCH_BYTES = 256 * 1024 - 1024 # depscan body cap is 256KB; reserve headroom for envelope/headers - _LEVEL_MAP = { logging.DEBUG: "DEBUG", logging.INFO: "INFO", @@ -88,48 +86,22 @@ def _flush(self) -> None: with self._lock: if not self._buf: return - entries = self._buf + batch = self._buf self._buf = [] _FLUSH_GUARD.active = True try: - for chunk in _chunk_by_size(entries): - try: - self._client.request( - path=f"python-cli-runs/{self._run_id}/logs", - method="POST", - payload=json.dumps({"logs": chunk}), - ) - except Exception as e: - log.debug(f"log upload failed (swallowed, {len(chunk)} entries dropped): {e}") + self._client.request( + path=f"python-cli-runs/{self._run_id}/logs", + method="POST", + payload=json.dumps({"logs": batch}), + ) + except Exception as e: + log.debug(f"log upload failed (swallowed, {len(batch)} entries dropped): {e}") finally: _FLUSH_GUARD.active = False -def _chunk_by_size(entries: list) -> list: - """Split entries into chunks that each serialize to <= _MAX_BATCH_BYTES. - Single entries that exceed the cap are dropped with a debug log.""" - chunks: list = [] - current: list = [] - envelope = len('{"logs":[]}') - current_size = envelope - for entry in entries: - entry_size = len(json.dumps(entry)) + 1 # +1 for inter-entry comma - if entry_size + envelope > _MAX_BATCH_BYTES: - log.debug(f"log entry too large ({entry_size}B), dropped") - continue - if current and current_size + entry_size > _MAX_BATCH_BYTES: - chunks.append(current) - current = [entry] - current_size = envelope + entry_size - else: - current.append(entry) - current_size += entry_size - if current: - chunks.append(current) - return chunks - - class UploadingLogHandler(logging.Handler): def __init__(self, uploader: BatchedLogUploader, context: str = "socket-python-cli"): super().__init__() diff --git a/tests/unit/test_log_uploader.py b/tests/unit/test_log_uploader.py index 331b885..5555454 100644 --- a/tests/unit/test_log_uploader.py +++ b/tests/unit/test_log_uploader.py @@ -8,10 +8,8 @@ from socketsecurity.core.cli_client import CliClient from socketsecurity.core.exceptions import APIFailure from socketsecurity.core.log_uploader import ( - _MAX_BATCH_BYTES, BatchedLogUploader, UploadingLogHandler, - _chunk_by_size, ) from socketsecurity.core.socket_config import SocketConfig @@ -140,49 +138,6 @@ def test_levels_map_correctly(): assert levels == ["DEBUG", "INFO", "WARN", "ERROR", "ERROR"] -def test_chunk_by_size_keeps_small_batches_intact(): - entries = [{"timestamp": "t", "level": "INFO", "message": "x", "context": "c"}] * 5 - chunks = _chunk_by_size(entries) - assert len(chunks) == 1 - assert chunks[0] == entries - - -def test_chunk_by_size_splits_when_exceeding_cap(): - big_msg = "y" * 1000 - entries = [ - {"timestamp": "2026-05-07 22:30:00.000", "level": "INFO", - "message": big_msg, "context": "c"} - for _ in range(500) - ] - chunks = _chunk_by_size(entries) - assert len(chunks) >= 2 - for chunk in chunks: - size = len(json.dumps({"logs": chunk})) - assert size <= _MAX_BATCH_BYTES - assert sum(len(c) for c in chunks) == len(entries) - - -def test_chunk_by_size_drops_single_oversize_entry(): - too_big = {"timestamp": "t", "level": "INFO", - "message": "z" * (_MAX_BATCH_BYTES + 100), "context": "c"} - ok = {"timestamp": "t", "level": "INFO", "message": "ok", "context": "c"} - chunks = _chunk_by_size([ok, too_big, ok]) - flat = [e for c in chunks for e in c] - assert flat == [ok, ok] # too_big dropped, smalls preserved - - -def test_flush_chunks_oversize_buffer_into_multiple_posts(): - client = Mock(spec=CliClient) - u = BatchedLogUploader(client, "run-c", flush_interval=10) - big_msg = "y" * 1000 - for _ in range(500): - u.add({"timestamp": "2026-05-07 22:30:00.000", "level": "INFO", - "message": big_msg, "context": "c"}) - - u._flush() - assert client.request.call_count >= 2 # split into multiple POSTs - - def test_run_thread_flushes_periodically_then_exits(): client = Mock(spec=CliClient) u = BatchedLogUploader(client, "run-t", flush_interval=0.05) From 03b61266b9a601c2c771c505731e6f91abafd174 Mon Sep 17 00:00:00 2001 From: barslev Date: Fri, 15 May 2026 12:19:24 +0200 Subject: [PATCH 3/5] chore: drop integration field from cli-run register payload The server-side handler now rejects unknown fields and the integration column has been removed from the schema (it was plumbed end-to-end but never displayed, filtered, or grouped on). Stop sending it. Removes the integration parameter from register_cli_run and setup_streaming, drops the corresponding wiring in socketcli.py, and prunes the now-pointless test_register_cli_run_omits_integration_when_falsy case. --- socketsecurity/core/cli_run.py | 6 +----- socketsecurity/core/streaming.py | 2 -- socketsecurity/socketcli.py | 1 - tests/unit/test_cli_run.py | 14 ++------------ tests/unit/test_streaming.py | 4 ---- 5 files changed, 3 insertions(+), 24 deletions(-) diff --git a/socketsecurity/core/cli_run.py b/socketsecurity/core/cli_run.py index 205e812..e3fd428 100644 --- a/socketsecurity/core/cli_run.py +++ b/socketsecurity/core/cli_run.py @@ -23,16 +23,12 @@ def register_cli_run( client: CliClient, client_version: str, - integration: Optional[str] = None, ) -> Optional[str]: - payload = {"client_version": client_version} - if integration: - payload["integration"] = integration try: resp = client.request( path="python-cli-runs", method="POST", - payload=json.dumps(payload), + payload=json.dumps({"client_version": client_version}), ) except APIFailure as e: log.debug(f"cli-run register failed (streaming disabled): {e}") diff --git a/socketsecurity/core/streaming.py b/socketsecurity/core/streaming.py index cd6ede9..13a4b2f 100644 --- a/socketsecurity/core/streaming.py +++ b/socketsecurity/core/streaming.py @@ -30,13 +30,11 @@ def setup_streaming( cli_logger: logging.Logger, sdk_logger: logging.Logger, client_version: str, - integration: Optional[str], enable_debug: bool, ) -> Optional[Callable[[], None]]: run_id = register_cli_run( client, client_version=client_version, - integration=integration, ) if not run_id: cli_logger.debug("server log streaming disabled (register failed)") diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index ea5e73e..a162510 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -104,7 +104,6 @@ def main_code(): cli_logger=log, sdk_logger=socket_logger, client_version=config.version, - integration=config.integration_type, enable_debug=config.enable_debug, ) if teardown: diff --git a/tests/unit/test_cli_run.py b/tests/unit/test_cli_run.py index 1c171bd..0861ad5 100644 --- a/tests/unit/test_cli_run.py +++ b/tests/unit/test_cli_run.py @@ -16,14 +16,14 @@ def test_register_cli_run_returns_run_id(): client = Mock(spec=CliClient) client.request.return_value = _resp({"run_id": "srv-issued-123"}) - run_id = register_cli_run(client, client_version="1.2.3", integration="github") + run_id = register_cli_run(client, client_version="1.2.3") assert run_id == "srv-issued-123" args, kwargs = client.request.call_args assert kwargs["path"] == "python-cli-runs" assert kwargs["method"] == "POST" body = json.loads(kwargs["payload"]) - assert body == {"client_version": "1.2.3", "integration": "github"} + assert body == {"client_version": "1.2.3"} def test_register_cli_run_returns_none_on_api_failure(): @@ -49,16 +49,6 @@ def test_register_cli_run_returns_none_on_bad_json(): assert register_cli_run(client, client_version="1.0.0") is None -def test_register_cli_run_omits_integration_when_falsy(): - client = Mock(spec=CliClient) - client.request.return_value = _resp({"run_id": "x"}) - - register_cli_run(client, client_version="1.0.0", integration=None) - - body = json.loads(client.request.call_args.kwargs["payload"]) - assert body == {"client_version": "1.0.0"} - - def test_finalize_cli_run_posts_status(): client = Mock(spec=CliClient) finalize_cli_run(client, "run-x", status="failure") diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index c20334d..967e60d 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -21,7 +21,6 @@ def test_setup_streaming_returns_none_when_register_fails(): cli_logger=logging.getLogger("t-fail"), sdk_logger=logging.getLogger("t-fail-sdk"), client_version="1.0", - integration=None, enable_debug=False, ) assert teardown is None @@ -45,7 +44,6 @@ def fake_finalize(client, run_id, status="success"): cli_logger=cli_logger, sdk_logger=sdk_logger, client_version="1.0", - integration=None, enable_debug=False, ) assert teardown is not None @@ -74,7 +72,6 @@ def fake_finalize(client, run_id, status="success"): cli_logger=cli_logger, sdk_logger=sdk_logger, client_version="1.0", - integration=None, enable_debug=False, ) teardown() @@ -100,7 +97,6 @@ def test_setup_streaming_restores_logger_state_on_teardown(): cli_logger=cli_logger, sdk_logger=sdk_logger, client_version="1.0", - integration=None, enable_debug=False, ) # During streaming: levels and propagate are forced From d18795dab70c2625b2aaa7822063d58fd42d5482 Mon Sep 17 00:00:00 2001 From: barslev Date: Fri, 15 May 2026 13:15:00 +0200 Subject: [PATCH 4/5] feat: link cli-run to its full_scan via report_run_id on finalize MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The depscan side now joins cli_run → full_scans → repositories via the report_run_id field to surface the scanned repo in the admin dashboard view of each CLI run. Wire the CLI to send the full_scan_id (== the report_run_id depscan expects) when it has one. - finalize_cli_run accepts an optional report_run_id and includes it (nullable) in the POST body. - streaming.py adds a module-level _report_run_id holder and a set_report_run_id() setter; teardown passes it through to finalize. - socketcli.py captures diff.id at a single chokepoint after the diff-producing branches converge, guarded against the NO_DIFF_RAN / NO_SCAN_RAN sentinel values. The field is nullable end-to-end so CLI invocations that fail before producing a diff (or are run in modes that don't create one) still finalize cleanly. --- socketsecurity/core/cli_run.py | 3 ++- socketsecurity/core/streaming.py | 8 +++++++- socketsecurity/socketcli.py | 5 ++++- tests/unit/test_cli_run.py | 12 ++++++++++-- tests/unit/test_streaming.py | 23 +++++++++++++++-------- 5 files changed, 38 insertions(+), 13 deletions(-) diff --git a/socketsecurity/core/cli_run.py b/socketsecurity/core/cli_run.py index e3fd428..651ec04 100644 --- a/socketsecurity/core/cli_run.py +++ b/socketsecurity/core/cli_run.py @@ -51,12 +51,13 @@ def finalize_cli_run( client: CliClient, run_id: str, status: str = "success", + report_run_id: Optional[str] = None, ) -> None: try: client.request( path=f"python-cli-runs/{run_id}/finalize", method="POST", - payload=json.dumps({"status": status}), + payload=json.dumps({"status": status, "report_run_id": report_run_id}), ) except Exception as e: log.debug(f"cli-run finalize failed (swallowed): {e}") diff --git a/socketsecurity/core/streaming.py b/socketsecurity/core/streaming.py index 13a4b2f..aeefed5 100644 --- a/socketsecurity/core/streaming.py +++ b/socketsecurity/core/streaming.py @@ -17,6 +17,7 @@ from .log_uploader import BatchedLogUploader, UploadingLogHandler _run_status: str = "success" +_report_run_id: Optional[str] = None def set_run_status(status: str) -> None: @@ -24,6 +25,11 @@ def set_run_status(status: str) -> None: _run_status = status +def set_report_run_id(report_run_id: Optional[str]) -> None: + global _report_run_id + _report_run_id = report_run_id + + def setup_streaming( *, client: CliClient, @@ -66,7 +72,7 @@ def teardown() -> None: cli_logger.removeHandler(upload_handler) sdk_logger.removeHandler(upload_handler) log_uploader.stop() - finalize_cli_run(client, run_id, status=_run_status) + finalize_cli_run(client, run_id, status=_run_status, report_run_id=_report_run_id) cli_logger.removeHandler(terminal_handler) sdk_logger.removeHandler(terminal_handler) cli_logger.setLevel(saved_levels[0]) diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index a162510..f3b4d7c 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -21,7 +21,7 @@ from socketsecurity.core.messages import Messages from socketsecurity.core.scm_comments import Comments from socketsecurity.core.socket_config import SocketConfig -from socketsecurity.core.streaming import set_run_status, setup_streaming +from socketsecurity.core.streaming import set_report_run_id, set_run_status, setup_streaming from socketsecurity.output import OutputHandler socket_logger, log = initialize_logging() @@ -761,6 +761,9 @@ def _is_unprocessed(c): ) output_handler.handle_output(diff) + if diff.id not in ("NO_DIFF_RAN", "NO_SCAN_RAN"): + set_report_run_id(diff.id) + # Handle license generation if not should_skip_scan and diff.id != "NO_DIFF_RAN" and diff.id != "NO_SCAN_RAN" and config.generate_license: all_packages = {} diff --git a/tests/unit/test_cli_run.py b/tests/unit/test_cli_run.py index 0861ad5..749acfd 100644 --- a/tests/unit/test_cli_run.py +++ b/tests/unit/test_cli_run.py @@ -49,14 +49,22 @@ def test_register_cli_run_returns_none_on_bad_json(): assert register_cli_run(client, client_version="1.0.0") is None -def test_finalize_cli_run_posts_status(): +def test_finalize_cli_run_posts_status_and_null_report_run_id_by_default(): client = Mock(spec=CliClient) finalize_cli_run(client, "run-x", status="failure") args, kwargs = client.request.call_args assert kwargs["path"] == "python-cli-runs/run-x/finalize" assert kwargs["method"] == "POST" - assert json.loads(kwargs["payload"]) == {"status": "failure"} + assert json.loads(kwargs["payload"]) == {"status": "failure", "report_run_id": None} + + +def test_finalize_cli_run_includes_report_run_id_when_provided(): + client = Mock(spec=CliClient) + finalize_cli_run(client, "run-x", status="success", report_run_id="fs-abc") + + body = json.loads(client.request.call_args.kwargs["payload"]) + assert body == {"status": "success", "report_run_id": "fs-abc"} def test_finalize_cli_run_swallows_errors(): diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index 967e60d..eb14b35 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -4,14 +4,20 @@ import pytest import socketsecurity.core.streaming as streaming_mod -from socketsecurity.core.streaming import set_run_status, setup_streaming +from socketsecurity.core.streaming import ( + set_report_run_id, + set_run_status, + setup_streaming, +) @pytest.fixture(autouse=True) -def reset_run_status(): +def reset_streaming_state(): streaming_mod._run_status = "success" + streaming_mod._report_run_id = None yield streaming_mod._run_status = "success" + streaming_mod._report_run_id = None def test_setup_streaming_returns_none_when_register_fails(): @@ -32,8 +38,8 @@ def test_teardown_finalizes_with_current_run_status(): finalize_calls = [] - def fake_finalize(client, run_id, status="success"): - finalize_calls.append(status) + def fake_finalize(client, run_id, status="success", report_run_id=None): + finalize_calls.append((status, report_run_id)) with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-1"), \ patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ @@ -49,9 +55,10 @@ def fake_finalize(client, run_id, status="success"): assert teardown is not None set_run_status("failure") + set_report_run_id("fs-xyz") teardown() - assert finalize_calls == ["failure"] + assert finalize_calls == [("failure", "fs-xyz")] def test_set_run_status_default_is_success(): @@ -60,8 +67,8 @@ def test_set_run_status_default_is_success(): finalize_calls = [] - def fake_finalize(client, run_id, status="success"): - finalize_calls.append(status) + def fake_finalize(client, run_id, status="success", report_run_id=None): + finalize_calls.append((status, report_run_id)) with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-2"), \ patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ @@ -76,7 +83,7 @@ def fake_finalize(client, run_id, status="success"): ) teardown() - assert finalize_calls == ["success"] + assert finalize_calls == [("success", None)] def test_setup_streaming_restores_logger_state_on_teardown(): From 0e8746d75aafd527bd2c9b9673b6cab94653a9c3 Mon Sep 17 00:00:00 2001 From: barslev Date: Sat, 16 May 2026 16:55:09 +0200 Subject: [PATCH 5/5] chore: bump version to 2.2.87 for streaming logs feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - socketsecurity/__init__.py: __version__ → 2.2.87 - pyproject.toml: version → 2.2.87 - CHANGELOG.md: new 2.2.87 entry describing the streaming-logs feature Required by .github/workflows/version-check.yml, which fails the PR if the version isn't incremented relative to main. --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- socketsecurity/__init__.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e2ea6c0..1f3dbd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 2.2.87 + +- Added a streaming log channel between the CLI and the Socket backend. Each CLI invocation now reports a per-run status (`in_progress` / `success` / `failure` / `cancelled`) and uploads a transcript of its own log output, visible in the Socket admin views. The transcript is captured regardless of the local `--enable-debug` state; the existing terminal verbosity is unchanged. The feature is best-effort — registration or upload failures silently degrade and never block the scan. Opt out with `--disable-server-log-streaming`. + ## 2.2.83 - Fixed branch detection in detached-HEAD CI checkouts. When `git name-rev --name-only HEAD` returned an output with a suffix operator (e.g. `remotes/origin/master~1`, `master^0`), the `~N`/`^N` was previously passed through as the branch name and rejected by the Socket API as an invalid Git ref. The suffix is now stripped before the prefix split, producing the bare branch name. diff --git a/pyproject.toml b/pyproject.toml index 49bb294..feaa859 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "socketsecurity" -version = "2.2.86" +version = "2.2.87" requires-python = ">= 3.11" license = {"file" = "LICENSE"} dependencies = [ diff --git a/socketsecurity/__init__.py b/socketsecurity/__init__.py index c816fab..69d6f7b 100644 --- a/socketsecurity/__init__.py +++ b/socketsecurity/__init__.py @@ -1,3 +1,3 @@ __author__ = 'socket.dev' -__version__ = '2.2.86' +__version__ = '2.2.87' USER_AGENT = f'SocketPythonCLI/{__version__}'