from datetime import datetime, timezone from dulwich import porcelain from dulwich.repo import MemoryRepo from fastapi import FastAPI from fastapi.responses import JSONResponse import json import uuid class SCIMResponse(JSONResponse): media_type = "application/scim+json" #Clone repo into memory local_repo = MemoryRepo() local_repo.refs.set_symbolic_ref(b"HEAD", b"refs/heads/main") fetch_result = porcelain.fetch(local_repo, "ssh://git@platform.sunet.se:22022/mifr/scim-source.git") local_repo.refs[b"refs/heads/main"] = fetch_result.refs[b"refs/heads/main"] last_tree = local_repo[local_repo[b"HEAD"].tree] app = FastAPI(default_response_class=SCIMResponse) def list_below(store, treeid, base): for name, mode, sha in store[treeid.id].iteritems(): if not base: base = b"" if name == base: for name, mode, sha in store[sha].iteritems(): yield name, mode, sha @app.get("/") def read_root(): return {"Hello": "World"} @app.get("/v2/Users") def read_all_user(): all_entries = list(list_below(local_repo, last_tree, b'Users')) response = {} response["schemas"] = ("urn:ietf:params:scim:api:messages:2.0:ListResponse") response["totalResults"] = len(all_entries) response['Resources'] = [] for entry in all_entries: response['Resources'].append(read_user(entry[0].split(b'.json')[0].decode('utf-8'))) return response @app.get("/v2/Users/{item_id}") def read_user(item_id: uuid.UUID): try: path = f"Users/{item_id}.json" blob = porcelain.get_object_by_path(local_repo, path, None) user = json.loads(blob.data) commits = [w.commit for w in local_repo.get_walker(paths=[path.encode()])] #import pdb;pdb.set_trace() last_mod = commits[0].author_time created = commits[-1].author_time user['meta'] = {} user['meta']['resourceType'] = 'User' user['meta']['version'] = commits[0].sha().hexdigest() user['meta']['created'] = datetime.fromtimestamp(created , tz=timezone.utc).isoformat() user['meta']['lastModified'] = datetime.fromtimestamp(last_mod, tz=timezone.utc).isoformat() user['meta']['location'] = "https://example.com/v2/Users/b181c1ec-46a9-40e1-965d-c6c78de47cb9" return user except KeyError: return("Panic, no file found. ")