import rdflib
import requests
import yaml
import os
import timeit
from collections import defaultdict
from SPARQLWrapper import SPARQLWrapper, JSON, BASIC, POST
from tqdm import tqdm
from acdh_arche_pyutils.utils import (
camel_to_snake,
create_query_sting,
id_from_uri
)
[docs]class ArcheApiClient():
"""Main Class to interact with ARCHE-API """
def __init__(
self,
arche_endpoint,
out_dir='.'
):
""" initializes the class
:param arche_endpoint: The ARCHE endpoint e.g. `https://arche-dev.acdh-dev.oeaw.ac.at/api/`
:type endpoint: str
:param out_dir: a path to serialize data to, defaults to '.'
:type out_dir: str
:return: A ArcheApiClient instance
:rtype: class:`achd_arch_pyutils.client.ArcheApiClient`
"""
super().__init__()
self.endpoint = arche_endpoint
self.out_dir = out_dir
self.describe_url = f"{arche_endpoint}describe"
print(f'Fetching description for endpoint: {self.endpoint}')
self.info = requests.get(self.describe_url)
self.description = yaml.load(self.info.text, Loader=yaml.FullLoader)
self.rest = self.description['rest']
self.schema = self.description['schema']
self.base_url = self.rest['urlBase']
self.path_base = self.rest['pathBase']
self.fetched_endpoint = f"{self.base_url}{self.path_base}"
for key, value in self.schema.items():
if isinstance(value, str):
setattr(self, camel_to_snake(key), value)
for key, value in self.schema['classes'].items():
if isinstance(value, str):
setattr(self, camel_to_snake(key), value)
self.serialization_map = {
'nt': 'application/n-triples',
'ttl': 'text/turtle',
'xml': 'application/rdf%2Bxml'
}
[docs] def top_col_ids(self):
"""returns of list of tuples (hasIdentifier, hasTitle) of all TopCollection"""
items = defaultdict(list)
query_params = {
"property[0]": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
"value[0]": self.top_collection,
"readMode": 'ids'
}
query_string = create_query_sting(query_params)
r = requests.get(f"{self.fetched_endpoint}search?{query_string}")
g = rdflib.Graph().parse(data=r.text, format='ttl')
for x in g.subject_objects(predicate=rdflib.URIRef(self.label)):
items[x[0]].append(x[1])
list_items = []
for key, value in items.items():
list_items.append((f"{key}", value))
return list_items
[docs] def get_resource(self, res_uri, format='nt'):
""" fetches the given resource and its ancestors/parents
:param res_uri: an ARCHE URI
:type res_uri: str
:return: A `rdflib.Graph` object
:rtype: rdflib.Graph
"""
query_params = {
"readMode": "relatives",
"parentProperty": self.parent,
"format": self.serialization_map[format]
}
query_string = create_query_sting(query_params)
url = f"{res_uri}/metadata?{query_string}"
print(f"fetching and parsing data for URI: {res_uri}, calling endpoint \n {url}")
start = timeit.default_timer()
g = rdflib.Graph().parse(location=url, format=format)
stop = timeit.default_timer()
print(f"fetching and parsing done in {stop - start}")
return g
[docs] def write_resource_to_file(self, res_uri, format='nt'):
"""
writes a resource (and its parents/children) to file on disk
:param res_uri: An ARCHE-URI
:type res_uri: str
:param format: The serialisation format, defaults to 'ttl' -> turtle\
use 'xml' for RDF/XML
:type format: str
:return: The location of the file
:rtype: str
"""
file_name = f"{id_from_uri(res_uri)}.{format}"
save_path = os.path.join(self.out_dir, file_name)
if format == 'ttl':
format = 'turtle'
else:
format = 'xml'
os.makedirs(os.path.dirname(save_path), exist_ok=True)
g = self.get_resource(res_uri)
g.serialize(save_path, format=format, encoding='utf8')
return save_path
[docs]class ArcheToTripleStore(ArcheApiClient):
""" A class to post ARCHE data to a Triplestore """
def __init__(
self,
triple_store,
user=None,
pw=None,
headers={
'Content-Type': 'text/turtle;charset=utf-8'
},
**kwargs
):
super().__init__(**kwargs)
self.triple_store = triple_store
self.user = user
self.pw = pw
self.headers = headers
self.queries = {
'count_triples': "SELECT (count(*) as ?cnt) WHERE {?s ?p ?o.}",
'delete_all': "DELETE { ?s ?p ?o } WHERE { ?s ?p ?o }"
}
[docs] def count_triples(self):
""" counts all existing triples in the triple store
:return: The triple count
:rtype: int
"""
sparql_query = self.queries['count_triples']
sparql_serv = SPARQLWrapper(self.triple_store)
sparql_serv.setReturnFormat(JSON)
sparql_serv.setQuery(sparql_query)
result = sparql_serv.query().convert()
triple_count = result['results']['bindings'][0]['cnt']['value']
try:
return int(triple_count)
except ValueError:
return triple_count
[docs] def delete_triples(self):
""" deletes everything """
sparql_query = self.queries['delete_all']
sparql_serv = SPARQLWrapper(self.triple_store)
sparql_serv.setHTTPAuth(BASIC)
sparql_serv.setMethod(POST)
sparql_serv.setCredentials(self.user, self.pw)
sparql_serv.setReturnFormat(JSON)
sparql_serv.setQuery(sparql_query)
result = sparql_serv.query().convert()
return result
[docs] def post_resource(self, res_id):
""" posts the given resource to the triple store
:param res_uri: An ARCHE-URI
:type res_uri: str
:return: The HTTP status code of the response and its body
:rtype: list
"""
res = self.get_resource(res_id)
try:
r = requests.post(
self.triple_store,
headers=self.headers,
auth=(self.user, self.pw),
data=res.serialize(format='ttl')
)
except requests.ConnectionError as e:
return [500, f"{e}"]
return [r.status_code, r.text]
[docs] def post_all_resources(self):
""" posts all TopCols to Triple Store
:return: A list of status codes and response texts and top-col-id
:rtype: list
"""
top_ids = [x[0] for x in self.top_col_ids()]
for x in tqdm(top_ids, total=len(top_ids)):
response = self.post_resource(x)
print(f"posting data for URI: {x};\
\n import response: \n {response}")
return "done"