63 lines
2.3 KiB
Python
63 lines
2.3 KiB
Python
from datetime import datetime, timezone
|
|
from dulwich import porcelain
|
|
from dulwich.repo import MemoryRepo
|
|
from fastapi import FastAPI
|
|
from fastapi.responses import JSONResponse
|
|
import json
|
|
import uuid
|
|
|
|
class SCIMResponse(JSONResponse):
|
|
media_type = "application/scim+json"
|
|
|
|
#Clone repo into memory
|
|
local_repo = MemoryRepo()
|
|
local_repo.refs.set_symbolic_ref(b"HEAD", b"refs/heads/main")
|
|
fetch_result = porcelain.fetch(local_repo, "ssh://git@platform.sunet.se:22022/mifr/scim-source.git")
|
|
local_repo.refs[b"refs/heads/main"] = fetch_result.refs[b"refs/heads/main"]
|
|
last_tree = local_repo[local_repo[b"HEAD"].tree]
|
|
|
|
app = FastAPI(default_response_class=SCIMResponse)
|
|
|
|
def list_below(store, treeid, base):
|
|
for name, mode, sha in store[treeid.id].iteritems():
|
|
if not base:
|
|
base = b""
|
|
if name == base:
|
|
for name, mode, sha in store[sha].iteritems():
|
|
yield name, mode, sha
|
|
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"Hello": "World"}
|
|
|
|
@app.get("/v2/Users")
|
|
def read_all_user():
|
|
all_entries = list(list_below(local_repo, last_tree, b'Users'))
|
|
response = {}
|
|
response["schemas"] = ("urn:ietf:params:scim:api:messages:2.0:ListResponse")
|
|
response["totalResults"] = len(all_entries)
|
|
response['Resources'] = []
|
|
for entry in all_entries:
|
|
response['Resources'].append(read_user(entry[0].split(b'.json')[0].decode('utf-8')))
|
|
return response
|
|
|
|
@app.get("/v2/Users/{item_id}")
|
|
def read_user(item_id: uuid.UUID):
|
|
try:
|
|
path = f"Users/{item_id}.json"
|
|
blob = porcelain.get_object_by_path(local_repo, path, None)
|
|
user = json.loads(blob.data)
|
|
commits = [w.commit for w in local_repo.get_walker(paths=[path.encode()])]
|
|
#import pdb;pdb.set_trace()
|
|
last_mod = commits[0].author_time
|
|
created = commits[-1].author_time
|
|
user['meta'] = {}
|
|
user['meta']['resourceType'] = 'User'
|
|
user['meta']['version'] = commits[0].sha().hexdigest()
|
|
user['meta']['created'] = datetime.fromtimestamp(created , tz=timezone.utc).isoformat()
|
|
user['meta']['lastModified'] = datetime.fromtimestamp(last_mod, tz=timezone.utc).isoformat()
|
|
user['meta']['location'] = "https://example.com/v2/Users/b181c1ec-46a9-40e1-965d-c6c78de47cb9"
|
|
return user
|
|
except KeyError:
|
|
return("Panic, no file found. ")
|
|
|