from google.cloud import vision
from google.cloud import storage
from google.protobuf import json_format
from google.cloud import translate
from google.cloud import bigquery
from google.cloud import datastore
import logging
import re
import time
import pandas as pd
import numpy as np
#!sudo pip3 install scispacy
import scispacy
from spacy import displacy
#https://github.com/explosion/spacy-models/releases/download/en_core_sci_sm-2.2.0/en_core_sci_sm-2.2.0.tar.gz
#https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.2.4/en_core_sci_lg-0.2.4.tar.gz
import en_core_sci_lg # en_ner_bionlp13cg_md, en_core_sci_sm
from scispacy.umls_linking import UmlsEntityLinker
from scispacy.abbreviation import AbbreviationDetector
Note:
This notebook was ran on AI Platform Notebook instance.If you are running this notebook on your local machine, you need to provide the service account credentials in order to authenticate when making the API calls.
# TODO: replace with your settings
project_id = "your_project_id"
location = "us-central1"
bucket_name = 'bucket_name'
bq_dataset_name = 'name_of_your_choice'
bq_table_name = 'name_of_your_choice_2'
storage_client = storage.Client()
vision_client = vision.ImageAnnotatorClient()
translate_client = translate.TranslationServiceClient()
datastore_client = datastore.Client()
bq_client = bigquery.Client()
def async_detect_document(vision_client, gcs_source_uri, gcs_destination_uri):
"""OCR with PDF/TIFF as source files on GCS"""
doc_title = gcs_source_uri.split('/')[-1].split('.pdf')[0]
# Supported mime_types are: 'application/pdf' and 'image/tiff'
mime_type = 'application/pdf'
# How many pages should be grouped into each json output file.
batch_size= 20
feature = vision.types.Feature(
type=vision.enums.Feature.Type.DOCUMENT_TEXT_DETECTION)
gcs_source = vision.types.GcsSource(uri=gcs_source_uri)
input_config = vision.types.InputConfig(
gcs_source=gcs_source, mime_type=mime_type)
gcs_destination = vision.types.GcsDestination(uri=gcs_destination_uri)
output_config = vision.types.OutputConfig(
gcs_destination=gcs_destination, batch_size=batch_size)
async_request = vision.types.AsyncAnnotateFileRequest(
features=[feature], input_config=input_config,
output_config=output_config)
operation = vision_client.async_batch_annotate_files(
requests=[async_request])
#print('Waiting for the operation to finish.')
operation.result(timeout=180)
print('Text extraction from document {} is completed.'.format(doc_title))
# Once the request has completed and the output has been
# written to GCS, we can list all the output files.
def read_json_result(json_gcs_path, doc_title):
gcs_destination_prefix = 'json/' + '{}-'.format(doc_title)
# List objects with the given prefix.
blob_list = list(storage_client.list_blobs(bucket_or_name=bucket_name,
prefix=gcs_destination_prefix))
all_text = ''
for blob in blob_list:
json_string = blob.download_as_string()
response = json_format.Parse(
json_string, vision.types.AnnotateFileResponse())
# The actual response for the first page of the input file.
for response in response.responses:
#first_page_response = response.responses[0]
text_response = response.full_text_annotation.text
all_text += text_response
all_text += ' '
print("Parsed json doc: {}".format(doc_title))
return all_text
def upload_blob(txt_content, destination_blob_name):
"""Uploads a file to the bucket."""
destination_blob_name = destination_blob_name.split('gs://{}/'.format(bucket_name))[-1]
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(txt_content)
print("Text uploaded to {}".format(destination_blob_name))
def batch_translate_text(
input_uri="gs://YOUR_BUCKET_ID/path/to/your/file.txt",
output_uri="gs://YOUR_BUCKET_ID/path/to/save/results/"):
"""Translates a batch of texts on GCS and stores the result in a GCS location."""
# Supported file types: https://cloud.google.com/translate/docs/supported-formats
gcs_source = {"input_uri": input_uri}
input_configs_element = {
"gcs_source": gcs_source,
"mime_type": "text/plain" # Can be "text/plain" or "text/html".
}
gcs_destination = {"output_uri_prefix": output_uri}
output_config = {"gcs_destination": gcs_destination}
parent = translate_client.location_path(project_id, location)
# Supported language codes: https://cloud.google.com/translate/docs/language
operation = translate_client.batch_translate_text(
parent=parent,
source_language_code="it",
target_language_codes=["en"], # Up to 10 language codes here.
input_configs=[input_configs_element],
output_config=output_config)
response = operation.result(180)
def removePunctuation(string):
# punctuation marks
punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
# traverse the given string and if any punctuation
# marks occur replace it with null
for x in string.lower():
if x in punctuations:
string = string.replace(x, "")
# Print string without punctuation
return string
customize_stop_words = [
'uoc', 'diagnostic', 'interventional', 'radiology', 'madonna', 'delle', 'grazie', 'hospital',
'Borgheresi', 'Agostini', 'Ottaviani', 'Floridi', 'Giovagnoni', 'di', 'specialization',
'Polytechnic', 'University', 'marche', 'ANCONA', 'Italy', 'Azienda', 'Ospedali',
'Riuniti', 'Yorrette', 'Matera', 'Michele', 'Nardella', 'Gerardo', 'Costanzo',
'Claudia', 'Lopez', 'st', 'a.', 'a', 'of', 's', 'cien', 'ze', 'diolog', 'ic', 'he',
'â', '€','s','b','case','Cuoladi','l','c','ra','bergamo','patelli','est','asst',
'dr','Dianluigi', 'Svizzero','i','riccardo','Alessandro','Spinazzola','angelo',
'maggiore', 'p' ,'r' ,'t', 'm', 'en', 't', 'o', 'd', 'e', 'n', 'd', 'o', 'g', 'h', 'u'
]
# Process documents
gcs_source_prefix = 'pdf'
lst_pdf_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
prefix='pdf')
lst_json_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
prefix='json')
overall_start_time = time.time()
for blob in lst_pdf_blobs:
doc_title = blob.name.split('/')[-1].split('.pdf')[0]
#files_metadata[doc_title] = {}
# Generate all paths
gcs_source_path = 'gs://' + bucket_name +'/' + blob.name
#start_time = time.time()
# OCR pdf documents
async_detect_document(vision_client,
gcs_source_path,
json_gcs_dest_path)
print ('OCR done.')
for blob in lst_json_blobs:
doc_title = blob.name.split('/')[-1].split('-')[0]
json_gcs_dest_path = 'gs://' + bucket_name + '/{}'.format(blob.name)
txt_gcs_dest_path = 'gs://' + bucket_name + '/raw_txt/' + doc_title + '.txt'
eng_txt_gcs_dest_path = 'gs://' + bucket_name + '/eng_txt/{}/'.format(doc_title)
processed_eng_gcs_dest_path = 'gs://' + bucket_name + '/curated_eng_txt/' + doc_title + '.txt'
# Parse json
all_text = read_json_result(json_gcs_dest_path, doc_title)
#files_metadata[doc_title]['text'] = all_text
# Upload raw text to GCS
upload_blob(all_text, txt_gcs_dest_path)
# Translate raw text to english
batch_translate_text(input_uri = txt_gcs_dest_path,
output_uri = eng_txt_gcs_dest_path)
# Process eng raw text
blob_prefix = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title,
bucket_name,
doc_title)
eng_blob = storage_client.get_bucket(bucket_name).get_blob(blob_prefix)
eng_raw_string = eng_blob.download_as_string().decode('utf-8')
# lowercase
#sample_text = eng_raw_string.lower()
# Remove dates
# 1 or 2 digit number followed by back slash followed by 1 or 2 digit number ...
pattern_dates = '(\d{1,2})/(\d{1,2})/(\d{4})'
pattern_fig = 'Figure (\d{1,2})'
pattern_image = '^Image .$'
replace = ''
eng_raw_string = re.sub(pattern_dates, replace, eng_raw_string)
eng_raw_string = re.sub(pattern_fig, replace, eng_raw_string)
eng_raw_string = re.sub(pattern_image, replace, eng_raw_string)
# remove punctuation and special characters
eng_raw_string = re.sub('[^A-Za-z0-9]+', ' ', eng_raw_string)
# Remove custom stop words
tokens = [token for token in eng_raw_string.split()if token not in customize_stop_words]
refined_doc = ''
for word in tokens:
refined_doc += ' {}'.format(word)
# Upload raw text to GCS
upload_blob(refined_doc, processed_eng_gcs_dest_path)
#print('refinement completed')
print('{} processing is done.'.format(doc_title))
def bqCreateDataset(dataset_name):
dataset_ref = bq_client.dataset(dataset_name)
try:
return bq_client.get_dataset(dataset_ref).dataset_id
except:
dataset = bigquery.Dataset(dataset_ref)
dataset = bq_client.create_dataset(dataset)
print('Dataset {} created.'.format(dataset.dataset_id))
return dataset.dataset_id
def bqCreateTable(dataset_id,
table_name,):
"""
Create main table with all cases and the medical text.
return:
table_id
"""
dataset_ref = bq_client.dataset(dataset_id)
# Prepares a reference to the table
table_ref = dataset_ref.table(table_name)
try:
return bq_client.get_table(table_ref).table_id
except:
schema = [
bigquery.SchemaField('case', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('it_raw_txt', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('eng_raw_txt', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('eng_txt', 'STRING', mode='REQUIRED',
description='Output of preprocessing pipeline.')]
table = bigquery.Table(table_ref, schema=schema)
table = bq_client.create_table(table)
print('table {} created.'.format(table.table_id))
return table.table_id
def exportItems2BQ(dataset_id, table_id, case,
it_raw_blob, eng_raw_blob, curated_eng_blob):
# Prepares a reference to the dataset
dataset_ref = bq_client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = bq_client.get_table(table_ref) # API call
# Download text from GCS
it_raw_txt_string = it_raw_blob.download_as_string().decode('utf-8')
eng_raw_txt_string = eng_raw_blob.download_as_string().decode('utf-8')
curated_eng_string = curated_eng_blob.download_as_string().decode('utf-8')
rows_to_insert = [{'case': case,
'it_raw_txt': it_raw_txt_string,
'eng_raw_txt': eng_raw_txt_string,
'eng_txt': curated_eng_string
}]
errors = bq_client.insert_rows(table, rows_to_insert) # API request
assert errors == []
print('{} was added to {} dataset, specifically in {} table.'.format(case,
dataset_id,
table_id))
gcs_source_prefix = 'raw_txt'
dataset_id = bqCreateDataset(bq_dataset_name)
table_id = bqCreateTable(dataset_id, bq_table_name)
lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
prefix=gcs_source_prefix)
for blob in lst_blobs:
doc_title = blob.name.split('/')[-1].split('.txt')[0]
# download as string
# it_raw_txt = gs://bucket_name/
it_raw_blob = storage_client.get_bucket(bucket_name).get_blob('raw_txt/{}.txt'.format(doc_title))
# eng_raw_txt = gs://covid19-aziz/text/[...]covid19-aziz_text_raw_txt_{doc_title}_en_translations.txt
path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title,
bucket_name,
doc_title)
eng_raw_blob = storage_client.get_bucket(bucket_name).get_blob(path_blob_eng_raw)
# curated_eng_txt = gs://covid19-aziz/text/curated_eng_txt/case1.txt
curated_eng_blob = storage_client.get_bucket(bucket_name)\
.get_blob('curated_eng_txt/{}.txt'.format(doc_title))
# populate to BQ dataset
exportItems2BQ(dataset_id, table_id, doc_title,
it_raw_blob, eng_raw_blob, curated_eng_blob)
def returnQueryResults(bq_client, project_id, dataset_id, table_id, case_id):
"""
Args:
bq_client:
project_id:
dataset_id:
table_id:
case_id:
Returns:
"""
query = ('SELECT * FROM `{}.{}.{}` WHERE `case`="{}" LIMIT 1'.format(project_id, dataset_id, table_id, case_id))
try:
query_job = bq_client.query(query)
is_exist = len(list(query_job.result())) >= 1
logging.info('Query case id: {}'.format(case_id) if is_exist \
else "Case id: {} does NOT exist".format(case_id))
print (list(query_job.result()))
except Exception as e:
logging.error("Error", e)
returnQueryResults(bq_client, project_id, 'covid19', 'ISMIR_cases', 'case1')
# Load model
# en_ner_bionlp13cg_md or en_core_sci_lg
nlp = en_core_sci_lg.load()
# Add pipe features to pipeline
linker = UmlsEntityLinker(resolve_abbreviations=True)
nlp.add_pipe(linker)
# Add the abbreviation pipe to the spacy pipeline.
abbreviation_pipe = AbbreviationDetector(nlp)
nlp.add_pipe(abbreviation_pipe)
def medicalEntityExtraction(doc):
# convert text to vector
display_text = displacy.render(doc,jupyter=True,style='ent')
annotated_entities = set([(X.text, X.label_) for X in doc.ents])
return display_text, annotated_entities
def addTask(client, entities_dict):
key = client.key('case', doc_title)
task = datastore.Entity(key=key)
task.update(
entities_dict)
client.put(task)
# Then get by key for this entity
return client.get(key)
# list of blobs
gcs_source_prefix = 'curated_eng_txt'
lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
prefix=gcs_source_prefix)
for blob in lst_blobs:
doc_title = blob.name.split('/')[-1].split('.txt')[0]
# download as string
eng_string = blob.download_as_string().decode('utf-8')
# convert to vector
doc = nlp(eng_string)
# Extract medical entities
pattern = 'T(\d{3})'
UMLS_tuis_entity = {}
entity_dict = {}
for idx in range(len(doc.ents)):
entity = doc.ents[idx]
entity_dict[entity] = ''
for umls_ent in entity._.umls_ents:
entity_dict[entity] = linker.umls.cui_to_entity[umls_ent[0]]
tui = re.search(pattern, str(entity_dict[entity]))
if tui:
UMLS_tuis_entity[str(entity)] = tui.group()
else:
UMLS_tuis_entity[str(entity)] = None
# generate dataframes
entities = list(UMLS_tuis_entity.keys())
TUIs = list(UMLS_tuis_entity.values())
df_entities = pd.DataFrame(data={'entity':entities,'TUIs':TUIs})
df_reference_TUIs = pd.read_csv('./data/UMLS_tuis.csv')
df_annotated_text_entities = pd.merge(df_entities,df_reference_TUIs,how='inner',on=['TUIs'])
# upload entities to datastore
entities_dict = {}
for idx in range(df_annotated_text_entities.shape[0]):
category = df_annotated_text_entities.iloc[idx].values[2]
#TUI = df_annotated_text_entities.iloc[idx].values[1]
#entities_dict[category].append(TUI)
med_entity = df_annotated_text_entities.iloc[idx].values[0]
# Append to list of entities if the key,value pair already exist
try:
entities_dict[category].append(med_entity)
except:
entities_dict[category] = []
entities_dict[category].append(med_entity)
# API call
key = addTask(datastore_client, entities_dict)
print('The upload of {} entities is done.'.format(doc_title))
df_annotated_text_entities.head()
def getCases(datastore_client, filter_dict, limit=10):
query = datastore_client.query(kind='case')
for key,values in filter_dict.items():
for value in values:
query.add_filter(key, '=', value)
results = list(query.fetch(limit=limit))
return results
filter_dict = {'Sign or Symptom':['onset symptoms', "chills"]}
getCases(datastore_client,filter_dict)