diff --git a/src/openhound_github/resources/enterprise.py b/src/openhound_github/resources/enterprise.py index 09cec31..a5d114e 100644 --- a/src/openhound_github/resources/enterprise.py +++ b/src/openhound_github/resources/enterprise.py @@ -1,3 +1,4 @@ +import logging from dataclasses import dataclass from dlt.sources.helpers.rest_client.client import RESTClient @@ -28,6 +29,8 @@ EnterpriseUser, ) +logger = logging.getLogger(__name__) + @dataclass class SourceContext: @@ -51,17 +54,24 @@ def enterprise(ctx: SourceContext): "variables": {"slug": ctx.enterprise_name, "after": None}, } - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - page_enterprise = page_data[0].get("enterprise") - - if page_enterprise: - yield page_enterprise + try: + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + page_enterprise = page_data[0].get("enterprise") + + if page_enterprise: + yield page_enterprise + except Exception as e: + logger.error( + f"Error in resource 'enterprise' processing enterprise '{ctx.enterprise_name}': {e}", + extra={"resource": "enterprise", "phase": "resource_iteration"}, + ) + return @app.transformer( diff --git a/src/openhound_github/resources/organization.py b/src/openhound_github/resources/organization.py index 12996d6..2ca7524 100644 --- a/src/openhound_github/resources/organization.py +++ b/src/openhound_github/resources/organization.py @@ -1,5 +1,6 @@ import base64 import binascii +import logging from collections.abc import Iterable from dataclasses import dataclass, field from datetime import datetime @@ -73,6 +74,8 @@ from openhound_github.models.repo_role_assignment import TEAM_PERMISSION_MAP from openhound_github.models.repository_role import DEFAULT_REPO_ROLES +logger = logging.getLogger(__name__) + @dataclass class OrgContext: @@ -233,26 +236,33 @@ def organizations(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - org_data = client.get(f"/orgs/{org_name}").json() - - actions = _actions_permissions(ctx, client, org_name) - self_hosted_runners = _runner_permissions(ctx, client, org_name) - workflow_perms = _workflow_permissions(ctx, client, org_name) - - org_data["actions_enabled_repositories"] = actions.get("enabled_repositories") - org_data["actions_allowed_actions"] = actions.get("allowed_actions") - org_data["actions_sha_pinning_required"] = actions.get("sha_pinning_required") - org_data["self_hosted_runners_enabled_repositories"] = self_hosted_runners.get( - "enabled_repositories" - ) - org_data["default_workflow_permissions"] = workflow_perms.get( - "default_workflow_permissions" - ) - org_data["can_approve_pull_request_reviews"] = workflow_perms.get( - "can_approve_pull_request_reviews" - ) + try: + org_data = client.get(f"/orgs/{org_name}").json() + + actions = _actions_permissions(ctx, client, org_name) + self_hosted_runners = _runner_permissions(ctx, client, org_name) + workflow_perms = _workflow_permissions(ctx, client, org_name) + + org_data["actions_enabled_repositories"] = actions.get("enabled_repositories") + org_data["actions_allowed_actions"] = actions.get("allowed_actions") + org_data["actions_sha_pinning_required"] = actions.get("sha_pinning_required") + org_data["self_hosted_runners_enabled_repositories"] = self_hosted_runners.get( + "enabled_repositories" + ) + org_data["default_workflow_permissions"] = workflow_perms.get( + "default_workflow_permissions" + ) + org_data["can_approve_pull_request_reviews"] = workflow_perms.get( + "can_approve_pull_request_reviews" + ) - yield org_data + yield org_data + except Exception as e: + logger.error( + f"Error in resource 'organizations' processing organization '{org_name}': {e}", + extra={"resource": "organizations", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="org_roles", columns=OrgRole, parallelized=True) @@ -375,9 +385,16 @@ def app_installations(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate(f"/orgs/{org_name}/installations"): - for item in page: - yield {**item, "org_login": org_name} + try: + for page in client.paginate(f"/orgs/{org_name}/installations"): + for item in page: + yield {**item, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'app_installations' processing organization '{org_name}': {e}", + extra={"resource": "app_installations", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="applications", columns=App, parallelized=True) @@ -421,27 +438,34 @@ def users(ctx: SourceContext) -> Iterator[dict[str, Any]]: for org in ctx.organizations: org_name = org.org_name client = org.client - paginator = GraphQLCursorPaginator( - page_info_path="data.organization.membersWithRole.pageInfo", - cursor_variable="after", - cursor_field="endCursor", - has_next_field="hasNextPage", - ) - data = { - "query": MEMBERS_WITH_ROLE_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } + try: + paginator = GraphQLCursorPaginator( + page_info_path="data.organization.membersWithRole.pageInfo", + cursor_variable="after", + cursor_field="endCursor", + has_next_field="hasNextPage", + ) + data = { + "query": MEMBERS_WITH_ROLE_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for edge in page_data[0]["organization"]["membersWithRole"]["edges"]: - node = edge.get("node", {}) - yield {**node, **edge, "org_login": org_name} + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for edge in page_data[0]["organization"]["membersWithRole"]["edges"]: + node = edge.get("node", {}) + yield {**node, **edge, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'users' processing organization '{org_name}': {e}", + extra={"resource": "users", "phase": "resource_iteration"}, + ) + continue @app.resource(name="teams", columns=Team, parallelized=True) @@ -465,26 +489,33 @@ def teams(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - paginator = GraphQLCursorPaginator( - page_info_path="data.organization.teams.pageInfo", - cursor_variable="after", - cursor_field="endCursor", - has_next_field="hasNextPage", - ) - data = { - "query": TEAMS_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - teams_data = page_data[0]["organization"]["teams"] - for team in teams_data["nodes"]: - yield {**team, "org_login": org_name} + try: + paginator = GraphQLCursorPaginator( + page_info_path="data.organization.teams.pageInfo", + cursor_variable="after", + cursor_field="endCursor", + has_next_field="hasNextPage", + ) + data = { + "query": TEAMS_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + teams_data = page_data[0]["organization"]["teams"] + for team in teams_data["nodes"]: + yield {**team, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'teams' processing organization '{org_name}': {e}", + extra={"resource": "teams", "phase": "resource_iteration"}, + ) + continue @app.resource( @@ -498,12 +529,19 @@ def projected_enterprise_teams(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/teams", params={"per_page": 100} - ): - for team in page: - if str(team.get("slug", "")).startswith("ent:") and team.get("node_id"): - yield {**team, "org_login": org_name} + try: + for page in client.paginate( + f"/orgs/{org_name}/teams", params={"per_page": 100} + ): + for team in page: + if str(team.get("slug", "")).startswith("ent:") and team.get("node_id"): + yield {**team, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'projected_enterprise_teams' processing organization '{org_name}': {e}", + extra={"resource": "projected_enterprise_teams", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="team_roles", columns=TeamRole, parallelized=True) @@ -593,11 +631,18 @@ def actions_permissions(ctx): for org in ctx.organizations: org_name = org.org_name client = org.client - actions = _actions_permissions(ctx, client, org_name) - yield { - **actions, - "org_login": org_name, - } + try: + actions = _actions_permissions(ctx, client, org_name) + yield { + **actions, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'actions_permissions' processing organization '{org_name}': {e}", + extra={"resource": "actions_permissions", "phase": "resource_iteration"}, + ) + continue @app.resource(name="repositories", columns=Repository, parallelized=True) @@ -613,53 +658,60 @@ def repositories(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - actions = _actions_permissions(ctx, client, org_name) - runner_settings = _runner_permissions(ctx, client, org_name) - - enabled_repo_ids: set[str] | None = None - if actions.get("enabled_repositories") == "selected": - enabled_repo_ids = set() - for page in client.paginate( - f"/orgs/{org_name}/actions/permissions/repositories", - params={"per_page": 100}, - data_selector="repositories", - ): - enabled_repo_ids.update( - repo["node_id"] for repo in page if repo.get("node_id") - ) + try: + actions = _actions_permissions(ctx, client, org_name) + runner_settings = _runner_permissions(ctx, client, org_name) + + enabled_repo_ids: set[str] | None = None + if actions.get("enabled_repositories") == "selected": + enabled_repo_ids = set() + for page in client.paginate( + f"/orgs/{org_name}/actions/permissions/repositories", + params={"per_page": 100}, + data_selector="repositories", + ): + enabled_repo_ids.update( + repo["node_id"] for repo in page if repo.get("node_id") + ) + + runner_enabled_repo_ids: set[str] | None = None + if runner_settings.get("enabled_repositories") == "selected": + runner_enabled_repo_ids = set() + for page in client.paginate( + f"/orgs/{org_name}/actions/permissions/self-hosted-runners/repositories", + params={"per_page": 100}, + data_selector="repositories", + ): + runner_enabled_repo_ids.update( + repo["node_id"] for repo in page if repo.get("node_id") + ) - runner_enabled_repo_ids: set[str] | None = None - if runner_settings.get("enabled_repositories") == "selected": - runner_enabled_repo_ids = set() for page in client.paginate( - f"/orgs/{org_name}/actions/permissions/self-hosted-runners/repositories", - params={"per_page": 100}, - data_selector="repositories", + f"/orgs/{org_name}/repos", params={"per_page": 100} ): - runner_enabled_repo_ids.update( - repo["node_id"] for repo in page if repo.get("node_id") - ) - - for page in client.paginate( - f"/orgs/{org_name}/repos", params={"per_page": 100} - ): - for repo in page: - repo_node_id = repo.get("node_id") - actions_enabled = actions.get("enabled_repositories") == "all" or ( - enabled_repo_ids is not None and repo_node_id in enabled_repo_ids - ) - self_hosted_runners_enabled = runner_settings.get( - "enabled_repositories" - ) == "all" or ( - runner_enabled_repo_ids is not None - and repo_node_id in runner_enabled_repo_ids - ) - yield { - **repo, - "actions_enabled": actions_enabled, - "self_hosted_runners_enabled": self_hosted_runners_enabled, - "org_login": org_name, - } + for repo in page: + repo_node_id = repo.get("node_id") + actions_enabled = actions.get("enabled_repositories") == "all" or ( + enabled_repo_ids is not None and repo_node_id in enabled_repo_ids + ) + self_hosted_runners_enabled = runner_settings.get( + "enabled_repositories" + ) == "all" or ( + runner_enabled_repo_ids is not None + and repo_node_id in runner_enabled_repo_ids + ) + yield { + **repo, + "actions_enabled": actions_enabled, + "self_hosted_runners_enabled": self_hosted_runners_enabled, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'repositories' processing organization '{org_name}': {e}", + extra={"resource": "repositories", "phase": "resource_iteration"}, + ) + continue @app.transformer( @@ -769,14 +821,21 @@ def repository_roles_base(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/custom-repository-roles", params={"per_page": 100} - ): - for item in page: - yield { - **item, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/custom-repository-roles", params={"per_page": 100} + ): + for item in page: + yield { + **item, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'repository_roles_base' processing organization '{org_name}': {e}", + extra={"resource": "repository_roles_base", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="repo_roles", columns=RepoRole, parallelized=True) @@ -842,27 +901,34 @@ def repositories_graphql(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - paginator = GraphQLCursorPaginator( - page_info_path="data.organization.repositories.pageInfo", - cursor_variable="after", - cursor_field="endCursor", - has_next_field="hasNextPage", - ) - data = { - "query": REPO_REFS_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } + try: + paginator = GraphQLCursorPaginator( + page_info_path="data.organization.repositories.pageInfo", + cursor_variable="after", + cursor_field="endCursor", + has_next_field="hasNextPage", + ) + data = { + "query": REPO_REFS_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - repos_page = page_data[0]["organization"]["repositories"] - for repo in repos_page["nodes"]: - yield {**repo, "org_login": org_name} + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + repos_page = page_data[0]["organization"]["repositories"] + for repo in repos_page["nodes"]: + yield {**repo, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'repositories_graphql' processing organization '{org_name}': {e}", + extra={"resource": "repositories_graphql", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="branches", columns=Branch, parallelized=True) @@ -1068,16 +1134,23 @@ def runner_groups(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/actions/runner-groups", - params={"per_page": 100}, - data_selector="runner_groups", - ): - for group in page: - yield { - **group, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/actions/runner-groups", + params={"per_page": 100}, + data_selector="runner_groups", + ): + for group in page: + yield { + **group, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'runner_groups' processing organization '{org_name}': {e}", + extra={"resource": "runner_groups", "phase": "resource_iteration"}, + ) + continue @app.resource(name="org_runners", columns=OrgRunner, parallelized=True) @@ -1085,16 +1158,23 @@ def org_runners(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/actions/runners", - params={"per_page": 100}, - data_selector="runners", - ): - for runner in page: - yield { - **runner, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/actions/runners", + params={"per_page": 100}, + data_selector="runners", + ): + for runner in page: + yield { + **runner, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'org_runners' processing organization '{org_name}': {e}", + extra={"resource": "org_runners", "phase": "resource_iteration"}, + ) + continue @app.transformer( @@ -1114,22 +1194,19 @@ def org_runner_group_memberships( accessible_repo_node_ids = _selected_runner_group_repo_node_ids( group_row, client, org_name ) - try: - for runner_page in client.paginate( - f"/orgs/{org_name}/actions/runner-groups/{group.id}/runners", - params={"per_page": 100}, - data_selector="runners", - ): - for runner in runner_page: - yield { - "runner_group_id": group.id, - "runner_id": runner["id"], - "runner_group_visibility": group.visibility, - "accessible_repo_node_ids": accessible_repo_node_ids, - "org_login": org_name, - } - except Exception: - return + for runner_page in client.paginate( + f"/orgs/{org_name}/actions/runner-groups/{group.id}/runners", + params={"per_page": 100}, + data_selector="runners", + ): + for runner in runner_page: + yield { + "runner_group_id": group.id, + "runner_id": runner["id"], + "runner_group_visibility": group.visibility, + "accessible_repo_node_ids": accessible_repo_node_ids, + "org_login": org_name, + } @app.transformer(name="repo_runners", columns=RepoRunner, parallelized=True) @@ -1289,14 +1366,21 @@ def organization_secrets(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/actions/secrets", params={"per_page": 100} - ): - for item in page: - yield { - **item, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/actions/secrets", params={"per_page": 100} + ): + for item in page: + yield { + **item, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'organization_secrets' processing organization '{org_name}': {e}", + extra={"resource": "organization_secrets", "phase": "resource_iteration"}, + ) + continue @app.transformer( @@ -1342,14 +1426,21 @@ def organization_variables(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/actions/variables", params={"per_page": 100} - ): - for item in page: - yield { - **item, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/actions/variables", params={"per_page": 100} + ): + for item in page: + yield { + **item, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'organization_variables' processing organization '{org_name}': {e}", + extra={"resource": "organization_variables", "phase": "resource_iteration"}, + ) + continue @app.transformer( @@ -1455,33 +1546,40 @@ def secret_scanning_alerts(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/secret-scanning/alerts", params={"per_page": 100} - ): - for alert in page: - valid_token_user_node_id: str | None = None - secret = alert.get("secret") - if ( - alert.get("state") == "open" - and alert.get("secret_type") == "github_personal_access_token" - and secret - ): - try: - resp = requests.get( - "https://api.github.com/user", - headers={"Authorization": f"Bearer {secret}"}, - timeout=10, - ) - if resp.status_code == 200: - valid_token_user_node_id = resp.json().get("node_id") - except Exception: - pass - - yield { - **alert, - "valid_token_user_node_id": valid_token_user_node_id, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/secret-scanning/alerts", params={"per_page": 100} + ): + for alert in page: + valid_token_user_node_id: str | None = None + secret = alert.get("secret") + if ( + alert.get("state") == "open" + and alert.get("secret_type") == "github_personal_access_token" + and secret + ): + try: + resp = requests.get( + "https://api.github.com/user", + headers={"Authorization": f"Bearer {secret}"}, + timeout=10, + ) + if resp.status_code == 200: + valid_token_user_node_id = resp.json().get("node_id") + except Exception: + pass + + yield { + **alert, + "valid_token_user_node_id": valid_token_user_node_id, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'secret_scanning_alerts' processing organization '{org_name}': {e}", + extra={"resource": "secret_scanning_alerts", "phase": "resource_iteration"}, + ) + continue @app.resource( @@ -1504,14 +1602,21 @@ def personal_access_tokens(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/personal-access-tokens", params={"per_page": 100} - ): - for pat in page: - yield { - **pat, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/personal-access-tokens", params={"per_page": 100} + ): + for pat in page: + yield { + **pat, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'personal_access_tokens' processing organization '{org_name}': {e}", + extra={"resource": "personal_access_tokens", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="pat_repo_access", columns=PatRepoAccess, parallelized=True) @@ -1558,15 +1663,22 @@ def personal_access_token_requests(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/personal-access-token-requests", - params={"per_page": 100}, - ): - for item in page: - yield { - **item, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/personal-access-token-requests", + params={"per_page": 100}, + ): + for item in page: + yield { + **item, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'personal_access_token_requests' processing organization '{org_name}': {e}", + extra={"resource": "personal_access_token_requests", "phase": "resource_iteration"}, + ) + continue @app.resource(name="saml_provider", columns=SamlProvider, parallelized=True) @@ -1585,25 +1697,32 @@ def saml_provider(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - data = { - "query": SAML_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } + try: + data = { + "query": SAML_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } - response = client.post("/graphql", json=data).json() - response_data = response.get("data", {}) - org_data = response_data.get("organization", {}) - if response_data and org_data: - idp = org_data.get("samlIdentityProvider") - if not idp: - continue + response = client.post("/graphql", json=data).json() + response_data = response.get("data", {}) + org_data = response_data.get("organization", {}) + if response_data and org_data: + idp = org_data.get("samlIdentityProvider") + if not idp: + continue - yield { - **idp, - "org_node_id": org_data["id"], - "org_name": org_data["name"], - "org_login": org_name, - } + yield { + **idp, + "org_node_id": org_data["id"], + "org_name": org_data["name"], + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'saml_provider' processing organization '{org_name}': {e}", + extra={"resource": "saml_provider", "phase": "resource_iteration"}, + ) + continue @app.resource(name="external_identities", columns=ExternalIdentity, parallelized=True) @@ -1620,34 +1739,41 @@ def external_identities(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - paginator = GraphQLCursorPaginator( - page_info_path="data.organization.samlIdentityProvider.externalIdentities.pageInfo", - cursor_variable="after", - cursor_field="endCursor", - has_next_field="hasNextPage", - allow_missing_page_info=True, - ) - data = { - "query": SAML_IDENTITIES_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } + try: + paginator = GraphQLCursorPaginator( + page_info_path="data.organization.samlIdentityProvider.externalIdentities.pageInfo", + cursor_variable="after", + cursor_field="endCursor", + has_next_field="hasNextPage", + allow_missing_page_info=True, + ) + data = { + "query": SAML_IDENTITIES_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for org in page_data: - org_data = org.get("organization") - idp = org_data.get("samlIdentityProvider") - if not idp: - continue - for identity in (idp.get("externalIdentities") or {}).get( - "nodes" - ) or []: - yield {**identity, "org_login": org_name} + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for org in page_data: + org_data = org.get("organization") + idp = org_data.get("samlIdentityProvider") + if not idp: + continue + for identity in (idp.get("externalIdentities") or {}).get( + "nodes" + ) or []: + yield {**identity, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'external_identities' processing organization '{org_name}': {e}", + extra={"resource": "external_identities", "phase": "resource_iteration"}, + ) + continue @app.resource(name="scim_users", columns=ScimResource, parallelized=True) @@ -1663,23 +1789,30 @@ def scim_users(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - scim_paginator = OffsetPaginator( - offset_param="startIndex", - limit_param="itemsPerPage", - limit=100, - total_path="totalResults", - ) - for page in client.paginate( - f"/scim/v2/organizations/{org_name}/Users", - params={"startIndex": 1, "itemsPerPage": 100}, - paginator=scim_paginator, - data_selector="Resources", - ): - for user in page: - yield { - **user, - "org_login": org_name, - } + try: + scim_paginator = OffsetPaginator( + offset_param="startIndex", + limit_param="itemsPerPage", + limit=100, + total_path="totalResults", + ) + for page in client.paginate( + f"/scim/v2/organizations/{org_name}/Users", + params={"startIndex": 1, "itemsPerPage": 100}, + paginator=scim_paginator, + data_selector="Resources", + ): + for user in page: + yield { + **user, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'scim_users' processing organization '{org_name}': {e}", + extra={"resource": "scim_users", "phase": "resource_iteration"}, + ) + continue def organization_resources(ctx: SourceContext): diff --git a/tests/test_error_resilience.py b/tests/test_error_resilience.py new file mode 100644 index 0000000..5f59289 --- /dev/null +++ b/tests/test_error_resilience.py @@ -0,0 +1,148 @@ +""" +Tests verifying error-resilient behavior in multi-org resource generators and transformers. + +After the fix: + Each org's processing is isolated in its own try/except block. A failure for + one org is caught, logged, and iteration continues with the next org — no data + is silently dropped. +""" +import inspect +import logging +from unittest.mock import MagicMock + +from openhound_github.resources.organization import ( + OrgContext, + SourceContext, + organizations, + users, +) + + +def _success_client(org_name: str) -> MagicMock: + """Mock RESTClient that returns a minimal org dict for every .get() call.""" + client = MagicMock() + client.get.return_value.json.return_value = { + "login": org_name, + "node_id": f"node_{org_name}", + } + return client + + +def _failing_client(error: Exception) -> MagicMock: + """Mock RESTClient that raises on any .get() call.""" + client = MagicMock() + client.get.side_effect = error + return client + + +def _graphql_page(org_login: str, logins: list[str]) -> list[dict]: + """Return one GraphQL page in the shape the `users` resource expects. + + `client.paginate(..., data_selector="data")` yields a list of the items + extracted from the `data` key. The `users` resource accesses + ``page_data[0]["organization"]["membersWithRole"]["edges"]``, so each + page must be a list whose first element is the parsed GraphQL data dict. + """ + edges = [{"node": {"id": f"id_{login}", "login": login}, "role": "MEMBER"} for login in logins] + return [ + { + "organization": { + "membersWithRole": { + "edges": edges, + "pageInfo": {"hasNextPage": False, "endCursor": None}, + } + } + } + ] + + +def test_rest_resource_continues_after_org_failure(caplog) -> None: + """A mid-iteration REST failure does NOT drop subsequent orgs. + + Setup: + org1 — succeeds (client.get returns valid org dict) + org2 — raises ConnectionError on its first API call + org3 — succeeds + + Expected (fixed behavior): + org1's data IS yielded. + org2 is skipped and an error is logged. + org3's data IS yielded — iteration continues after the failure. + """ + client1 = _success_client("org1") + client2 = _failing_client(ConnectionError("HTTP 500: Internal Server Error")) + client3 = _success_client("org3") + + ctx = SourceContext( + client=client1, + organizations=[ + OrgContext(client=client1, org_name="org1"), + OrgContext(client=client2, org_name="org2"), + OrgContext(client=client3, org_name="org3"), + ], + ) + + with caplog.at_level(logging.ERROR, logger="openhound_github.resources.organization"): + results = list(organizations.bind(ctx)) + + yielded_logins = {r.login for r in results} + + assert "org1" in yielded_logins, "org1 data should have been yielded before the error on org2" + assert "org3" in yielded_logins, "org3 data should still be yielded after org2 fails" + + assert any( + "Error in resource 'organizations' processing organization 'org2'" in msg + for msg in caplog.messages + ), "Expected an error log for org2" + + +def test_graphql_resource_continues_after_org_failure(caplog) -> None: + """A mid-iteration GraphQL failure does NOT drop subsequent orgs. + + Setup: + org1 — paginate succeeds, yields user1 + org2 — paginate raises ConnectionError + org3 — paginate succeeds, yields user3 + + Expected (fixed behavior): + user1 (org1) IS yielded. + org2 is skipped and an error is logged. + user3 (org3) IS yielded. + + Note: the raw generator is called directly (via ``_pipe.gen``) to stay + within the unit-test boundary and avoid DLT model-validation concerns. + """ + client1 = MagicMock() + client1.paginate.return_value = [_graphql_page("org1", ["user1"])] + + client2 = MagicMock() + client2.paginate.side_effect = ConnectionError("GraphQL endpoint unreachable") + + client3 = MagicMock() + client3.paginate.return_value = [_graphql_page("org3", ["user3"])] + + ctx = SourceContext( + client=client1, + organizations=[ + OrgContext(client=client1, org_name="org1"), + OrgContext(client=client2, org_name="org2"), + OrgContext(client=client3, org_name="org3"), + ], + ) + + # Call the raw generator function to bypass DLT pipeline machinery. + # parallelized=True wraps the pipe gen with wrap_parallel_iterator (functools.wraps), + # which yields deferred callables instead of data and loops forever in tests. + # inspect.unwrap() follows __wrapped__ back to the original generator function. + with caplog.at_level(logging.ERROR, logger="openhound_github.resources.organization"): + results = list(inspect.unwrap(users._pipe.gen)(ctx)) + + yielded_logins = {r["login"] for r in results} + + assert "user1" in yielded_logins, "user1 (org1) should have been yielded before org2 failed" + assert "user3" in yielded_logins, "user3 (org3) should still be yielded after org2 fails" + + assert any( + "Error in resource 'users' processing organization 'org2'" in msg + for msg in caplog.messages + ), "Expected an error log for org2"