from google.cloud import vision
from google.cloud import storage
from google.protobuf import json_format
from google.cloud import translate
from google.cloud import bigquery
from google.cloud import datastore
import logging
import re
import time
import pandas as pd
import numpy as np
#!sudo pip3 install scispacy
import scispacy
from spacy import displacy
#https://github.com/explosion/spacy-models/releases/download/en_core_sci_sm-2.2.0/en_core_sci_sm-2.2.0.tar.gz
#https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.2.4/en_core_sci_lg-0.2.4.tar.gz
import en_core_sci_lg # en_ner_bionlp13cg_md, en_core_sci_sm
from scispacy.umls_linking import UmlsEntityLinker
from scispacy.abbreviation import AbbreviationDetector
Note:
This notebook was ran on AI Platform Notebook instance.If you are running this notebook on your local machine, you need to provide the service account credentials in order to authenticate when making the API calls.
# TODO: replace with your settings
project_id = "your_project_id"
location = "us-central1"
bucket_name = 'bucket_name'
bq_dataset_name = 'name_of_your_choice'
bq_table_name = 'name_of_your_choice_2'
storage_client = storage.Client()
vision_client = vision.ImageAnnotatorClient()
translate_client = translate.TranslationServiceClient()
datastore_client = datastore.Client()
bq_client = bigquery.Client()
def async_detect_document(vision_client, gcs_source_uri, gcs_destination_uri):
    """OCR with PDF/TIFF as source files on GCS"""
    doc_title = gcs_source_uri.split('/')[-1].split('.pdf')[0]
    
    # Supported mime_types are: 'application/pdf' and 'image/tiff'
    mime_type = 'application/pdf'
    # How many pages should be grouped into each json output file.
    batch_size= 20
    feature = vision.types.Feature(
        type=vision.enums.Feature.Type.DOCUMENT_TEXT_DETECTION)
    gcs_source = vision.types.GcsSource(uri=gcs_source_uri)
    input_config = vision.types.InputConfig(
        gcs_source=gcs_source, mime_type=mime_type)
    gcs_destination = vision.types.GcsDestination(uri=gcs_destination_uri)
    output_config = vision.types.OutputConfig(
        gcs_destination=gcs_destination, batch_size=batch_size)
    async_request = vision.types.AsyncAnnotateFileRequest(
        features=[feature], input_config=input_config,
        output_config=output_config)
    operation = vision_client.async_batch_annotate_files(
        requests=[async_request])
    #print('Waiting for the operation to finish.')
    operation.result(timeout=180)
    print('Text extraction from document {} is completed.'.format(doc_title))
# Once the request has completed and the output has been
# written to GCS, we can list all the output files.
def read_json_result(json_gcs_path, doc_title):
    gcs_destination_prefix = 'json/' + '{}-'.format(doc_title)
    
    # List objects with the given prefix.
    blob_list = list(storage_client.list_blobs(bucket_or_name=bucket_name,
                                               prefix=gcs_destination_prefix))
    all_text = ''
    for blob in blob_list:
        json_string = blob.download_as_string()
        response = json_format.Parse(
            json_string, vision.types.AnnotateFileResponse())
        # The actual response for the first page of the input file.
        
        for response in response.responses:   
        #first_page_response = response.responses[0]
            text_response = response.full_text_annotation.text
            all_text += text_response
            all_text += ' '
    print("Parsed json doc: {}".format(doc_title))
    return all_text
def upload_blob(txt_content, destination_blob_name):
    """Uploads a file to the bucket."""
    destination_blob_name = destination_blob_name.split('gs://{}/'.format(bucket_name))[-1]
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_string(txt_content)
    print("Text uploaded to {}".format(destination_blob_name))
def batch_translate_text(
    input_uri="gs://YOUR_BUCKET_ID/path/to/your/file.txt",
    output_uri="gs://YOUR_BUCKET_ID/path/to/save/results/"):
    """Translates a batch of texts on GCS and stores the result in a GCS location."""
    # Supported file types: https://cloud.google.com/translate/docs/supported-formats
    gcs_source = {"input_uri": input_uri}
    input_configs_element = {
        "gcs_source": gcs_source,
        "mime_type": "text/plain"  # Can be "text/plain" or "text/html".
    }
    gcs_destination = {"output_uri_prefix": output_uri}
    output_config = {"gcs_destination": gcs_destination}
    parent = translate_client.location_path(project_id, location)
    # Supported language codes: https://cloud.google.com/translate/docs/language
    operation = translate_client.batch_translate_text(
        parent=parent,
        source_language_code="it",
        target_language_codes=["en"],  # Up to 10 language codes here.
        input_configs=[input_configs_element],
        output_config=output_config)
    response = operation.result(180)
def removePunctuation(string): 
  
    # punctuation marks 
    punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
  
    # traverse the given string and if any punctuation 
    # marks occur replace it with null 
    for x in string.lower(): 
        if x in punctuations: 
            string = string.replace(x, "") 
  
    # Print string without punctuation 
    return string
customize_stop_words = [
    'uoc', 'diagnostic', 'interventional', 'radiology', 'madonna', 'delle', 'grazie', 'hospital',
    'Borgheresi', 'Agostini', 'Ottaviani', 'Floridi', 'Giovagnoni', 'di', 'specialization',
    'Polytechnic', 'University', 'marche', 'ANCONA', 'Italy', 'Azienda', 'Ospedali', 
    'Riuniti', 'Yorrette', 'Matera', 'Michele', 'Nardella', 'Gerardo', 'Costanzo',
    'Claudia', 'Lopez', 'st', 'a.', 'a', 'of', 's', 'cien', 'ze', 'diolog', 'ic', 'he',
    'â', '€','s','b','case','Cuoladi','l','c','ra','bergamo','patelli','est','asst',
    'dr','Dianluigi', 'Svizzero','i','riccardo','Alessandro','Spinazzola','angelo',
    'maggiore', 'p' ,'r' ,'t', 'm', 'en', 't', 'o', 'd', 'e', 'n', 'd', 'o', 'g', 'h', 'u'
]
# Process documents
gcs_source_prefix = 'pdf'
lst_pdf_blobs = storage_client.list_blobs(bucket_or_name=bucket_name, 
                                      prefix='pdf')
lst_json_blobs = storage_client.list_blobs(bucket_or_name=bucket_name, 
                                      prefix='json')
overall_start_time = time.time()
for blob in lst_pdf_blobs:
    doc_title = blob.name.split('/')[-1].split('.pdf')[0]
    #files_metadata[doc_title] = {}
    # Generate all paths
    gcs_source_path = 'gs://' + bucket_name +'/' + blob.name
    #start_time = time.time()
    # OCR pdf documents
    async_detect_document(vision_client, 
                        gcs_source_path,
                        json_gcs_dest_path)
print ('OCR done.')
    
for blob in lst_json_blobs:
    doc_title = blob.name.split('/')[-1].split('-')[0]
    
    json_gcs_dest_path = 'gs://' + bucket_name + '/{}'.format(blob.name)
    txt_gcs_dest_path = 'gs://' + bucket_name + '/raw_txt/' + doc_title + '.txt'
    eng_txt_gcs_dest_path = 'gs://' + bucket_name + '/eng_txt/{}/'.format(doc_title)
    processed_eng_gcs_dest_path = 'gs://' + bucket_name + '/curated_eng_txt/' + doc_title + '.txt'
    
    # Parse json
    all_text = read_json_result(json_gcs_dest_path, doc_title)
    #files_metadata[doc_title]['text'] = all_text
    # Upload raw text to GCS
    upload_blob(all_text, txt_gcs_dest_path)
    # Translate raw text to english
    batch_translate_text(input_uri = txt_gcs_dest_path,
                        output_uri = eng_txt_gcs_dest_path)
    
    # Process eng raw text
    blob_prefix = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title,
                                                                         bucket_name,
                                                                         doc_title)
    
    eng_blob = storage_client.get_bucket(bucket_name).get_blob(blob_prefix)
    eng_raw_string = eng_blob.download_as_string().decode('utf-8')
    
    # lowercase
    #sample_text = eng_raw_string.lower()
    # Remove dates
    # 1 or 2 digit number followed by back slash followed by 1 or 2 digit number ...
    pattern_dates = '(\d{1,2})/(\d{1,2})/(\d{4})'
    pattern_fig = 'Figure (\d{1,2})' 
    pattern_image = '^Image .$'
    replace = ''
    eng_raw_string = re.sub(pattern_dates, replace, eng_raw_string) 
    eng_raw_string = re.sub(pattern_fig, replace, eng_raw_string)
    eng_raw_string = re.sub(pattern_image, replace, eng_raw_string) 
    
    # remove punctuation and special characters
    eng_raw_string = re.sub('[^A-Za-z0-9]+', ' ', eng_raw_string)
    # Remove custom stop words
    tokens = [token for token in eng_raw_string.split()if token not in customize_stop_words]
    refined_doc = ''
    for word in tokens:
        refined_doc += ' {}'.format(word)
    
    # Upload raw text to GCS
    upload_blob(refined_doc, processed_eng_gcs_dest_path)
    #print('refinement completed')
    print('{} processing is done.'.format(doc_title))
def bqCreateDataset(dataset_name):
    
    dataset_ref = bq_client.dataset(dataset_name)
    try:
        return bq_client.get_dataset(dataset_ref).dataset_id
    except:
        dataset = bigquery.Dataset(dataset_ref)
        dataset = bq_client.create_dataset(dataset)
        print('Dataset {} created.'.format(dataset.dataset_id))
        return dataset.dataset_id
def bqCreateTable(dataset_id, 
                  table_name,):
    """
    Create main table with all cases and the medical text.
    return:
        table_id
    """
    dataset_ref = bq_client.dataset(dataset_id)
    # Prepares a reference to the table
    table_ref = dataset_ref.table(table_name)
    try:
        return bq_client.get_table(table_ref).table_id
    except:
        schema = [
            bigquery.SchemaField('case', 'STRING', mode='REQUIRED'),
            bigquery.SchemaField('it_raw_txt', 'STRING', mode='REQUIRED'),
            bigquery.SchemaField('eng_raw_txt', 'STRING', mode='REQUIRED'),
            bigquery.SchemaField('eng_txt', 'STRING', mode='REQUIRED', 
                                 description='Output of preprocessing pipeline.')]
        table = bigquery.Table(table_ref, schema=schema)
        table = bq_client.create_table(table)
        print('table {} created.'.format(table.table_id))
        return table.table_id
def exportItems2BQ(dataset_id, table_id, case,
                   it_raw_blob, eng_raw_blob, curated_eng_blob):
    # Prepares a reference to the dataset
    dataset_ref = bq_client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    table = bq_client.get_table(table_ref)  # API call
    
    # Download text from GCS
    it_raw_txt_string = it_raw_blob.download_as_string().decode('utf-8')
    eng_raw_txt_string = eng_raw_blob.download_as_string().decode('utf-8')
    curated_eng_string = curated_eng_blob.download_as_string().decode('utf-8')
    
    rows_to_insert = [{'case': case,
                      'it_raw_txt': it_raw_txt_string,
                      'eng_raw_txt': eng_raw_txt_string,
                      'eng_txt': curated_eng_string
                     }]
    errors = bq_client.insert_rows(table, rows_to_insert)  # API request
    assert errors == []
    print('{} was added to {} dataset, specifically in {} table.'.format(case,
                                                                        dataset_id,
                                                                        table_id))
gcs_source_prefix = 'raw_txt'
dataset_id = bqCreateDataset(bq_dataset_name)
table_id = bqCreateTable(dataset_id, bq_table_name)
lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name, 
                                      prefix=gcs_source_prefix)
for blob in lst_blobs:
    doc_title = blob.name.split('/')[-1].split('.txt')[0]
    
    # download as string
    # it_raw_txt = gs://bucket_name/
    it_raw_blob = storage_client.get_bucket(bucket_name).get_blob('raw_txt/{}.txt'.format(doc_title))
    
    # eng_raw_txt = gs://covid19-aziz/text/[...]covid19-aziz_text_raw_txt_{doc_title}_en_translations.txt
    path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title,
                                                                              bucket_name,
                                                                              doc_title)
    eng_raw_blob = storage_client.get_bucket(bucket_name).get_blob(path_blob_eng_raw)
    
    # curated_eng_txt = gs://covid19-aziz/text/curated_eng_txt/case1.txt
    curated_eng_blob = storage_client.get_bucket(bucket_name)\
                                    .get_blob('curated_eng_txt/{}.txt'.format(doc_title))
    
    # populate to BQ dataset
    exportItems2BQ(dataset_id, table_id, doc_title,
                   it_raw_blob, eng_raw_blob, curated_eng_blob)
def returnQueryResults(bq_client, project_id, dataset_id, table_id, case_id):
    """
    Args:
        bq_client:
        project_id:
        dataset_id:
        table_id:
        case_id:
    Returns:
    """
    query = ('SELECT * FROM `{}.{}.{}` WHERE `case`="{}" LIMIT 1'.format(project_id, dataset_id, table_id, case_id))
    try:
        query_job = bq_client.query(query)
        is_exist = len(list(query_job.result())) >= 1
        logging.info('Query case id: {}'.format(case_id) if is_exist \
                         else "Case id: {} does NOT exist".format(case_id))
        print (list(query_job.result()))
    except Exception as e:
        logging.error("Error", e)
returnQueryResults(bq_client, project_id, 'covid19', 'ISMIR_cases', 'case1')
# Load model
# en_ner_bionlp13cg_md or en_core_sci_lg
nlp = en_core_sci_lg.load()
# Add pipe features to pipeline 
linker = UmlsEntityLinker(resolve_abbreviations=True)
nlp.add_pipe(linker)
# Add the abbreviation pipe to the spacy pipeline.
abbreviation_pipe = AbbreviationDetector(nlp)
nlp.add_pipe(abbreviation_pipe)
def medicalEntityExtraction(doc):
    # convert text to vector
    display_text = displacy.render(doc,jupyter=True,style='ent')
    annotated_entities = set([(X.text, X.label_) for X in doc.ents])
    return display_text, annotated_entities
def addTask(client, entities_dict):
    key = client.key('case', doc_title)
    task = datastore.Entity(key=key)
    task.update(
        entities_dict)
    client.put(task)
    # Then get by key for this entity
    return client.get(key)
# list of blobs
gcs_source_prefix = 'curated_eng_txt'
lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name, 
                                      prefix=gcs_source_prefix)
for blob in lst_blobs:
    doc_title = blob.name.split('/')[-1].split('.txt')[0]
    
    # download as string
    eng_string = blob.download_as_string().decode('utf-8')
    
    # convert to vector
    doc = nlp(eng_string)
    
    # Extract medical entities
    pattern = 'T(\d{3})'
    UMLS_tuis_entity = {}
    entity_dict = {}
    for idx in range(len(doc.ents)):
        entity = doc.ents[idx]
        entity_dict[entity] = ''
        for umls_ent in entity._.umls_ents:
            entity_dict[entity] = linker.umls.cui_to_entity[umls_ent[0]]
        tui = re.search(pattern, str(entity_dict[entity]))
        if tui:
            UMLS_tuis_entity[str(entity)] = tui.group()
        else:
            UMLS_tuis_entity[str(entity)] = None
            
    # generate dataframes
    entities = list(UMLS_tuis_entity.keys())
    TUIs = list(UMLS_tuis_entity.values())
    df_entities = pd.DataFrame(data={'entity':entities,'TUIs':TUIs})
    df_reference_TUIs = pd.read_csv('./data/UMLS_tuis.csv')
    df_annotated_text_entities = pd.merge(df_entities,df_reference_TUIs,how='inner',on=['TUIs'])
    
    # upload entities to datastore
    entities_dict = {}
    for idx in range(df_annotated_text_entities.shape[0]):
        
        category = df_annotated_text_entities.iloc[idx].values[2]
        
        #TUI = df_annotated_text_entities.iloc[idx].values[1]
        #entities_dict[category].append(TUI)
        
        med_entity = df_annotated_text_entities.iloc[idx].values[0]
        
        
        # Append to list of entities if the key,value pair already exist
        try:
            entities_dict[category].append(med_entity)
            
        except:
            entities_dict[category] = []
            entities_dict[category].append(med_entity)
        
        
        # API call
    key = addTask(datastore_client, entities_dict)
    print('The upload of {} entities is done.'.format(doc_title))
df_annotated_text_entities.head()
def getCases(datastore_client, filter_dict, limit=10):
    query = datastore_client.query(kind='case')
    
    for key,values in filter_dict.items():
        for value in values:
            query.add_filter(key, '=', value)
    results = list(query.fetch(limit=limit))
    return results
filter_dict = {'Sign or Symptom':['onset symptoms', "chills"]}
getCases(datastore_client,filter_dict)