from google.cloud import vision
from google.cloud import storage
from google.protobuf import json_format
from google.cloud import translate
from google.cloud import bigquery
from google.cloud import datastore
import logging
import re
import time
import pandas as pd
import numpy as np
#!sudo pip3 install scispacy
import scispacy
from spacy import displacy
#https://github.com/explosion/spacy-models/releases/download/en_core_sci_sm-2.2.0/en_core_sci_sm-2.2.0.tar.gz
#https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.2.4/en_core_sci_lg-0.2.4.tar.gz
import en_core_sci_lg # en_ner_bionlp13cg_md, en_core_sci_sm
from scispacy.umls_linking import UmlsEntityLinker
from scispacy.abbreviation import AbbreviationDetector
References:
project_id = "pm-preparation"
location = "us-central1"
storage_client = storage.Client()
vision_client = vision.ImageAnnotatorClient()
translate_client = translate.TranslationServiceClient()
datastore_client = datastore.Client()
bq_client = bigquery.Client()
def async_detect_document(vision_client, gcs_source_uri, gcs_destination_uri):
"""OCR with PDF/TIFF as source files on GCS"""
doc_title = gcs_source_uri.split('/')[-1].split('.pdf')[0]
# Supported mime_types are: 'application/pdf' and 'image/tiff'
mime_type = 'application/pdf'
# How many pages should be grouped into each json output file.
batch_size= 20
feature = vision.types.Feature(
type=vision.enums.Feature.Type.DOCUMENT_TEXT_DETECTION)
gcs_source = vision.types.GcsSource(uri=gcs_source_uri)
input_config = vision.types.InputConfig(
gcs_source=gcs_source, mime_type=mime_type)
gcs_destination = vision.types.GcsDestination(uri=gcs_destination_uri)
output_config = vision.types.OutputConfig(
gcs_destination=gcs_destination, batch_size=batch_size)
async_request = vision.types.AsyncAnnotateFileRequest(
features=[feature], input_config=input_config,
output_config=output_config)
operation = vision_client.async_batch_annotate_files(
requests=[async_request])
#print('Waiting for the operation to finish.')
operation.result(timeout=180)
print('Text extraction from document {} is completed.'.format(doc_title))
# Once the request has completed and the output has been
# written to GCS, we can list all the output files.
def read_json_result(json_gcs_path, doc_title):
gcs_destination_prefix = 'json/' + '{}-'.format(doc_title)
# List objects with the given prefix.
blob_list = list(storage_client.list_blobs(bucket_or_name=bucket_name,
prefix=gcs_destination_prefix))
all_text = ''
for blob in blob_list:
json_string = blob.download_as_string()
response = json_format.Parse(
json_string, vision.types.AnnotateFileResponse())
# The actual response for the first page of the input file.
for response in response.responses:
#first_page_response = response.responses[0]
text_response = response.full_text_annotation.text
all_text += text_response
all_text += ' '
print("Parsed json doc: {}".format(doc_title))
return all_text
def upload_blob(txt_content, destination_blob_name):
"""Uploads a file to the bucket."""
destination_blob_name = destination_blob_name.split('gs://{}/'.format(bucket_name))[-1]
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(txt_content)
print("Text uploaded to {}".format(destination_blob_name))
def batch_translate_text(
input_uri="gs://YOUR_BUCKET_ID/path/to/your/file.txt",
output_uri="gs://YOUR_BUCKET_ID/path/to/save/results/"):
"""Translates a batch of texts on GCS and stores the result in a GCS location."""
# Supported file types: https://cloud.google.com/translate/docs/supported-formats
gcs_source = {"input_uri": input_uri}
input_configs_element = {
"gcs_source": gcs_source,
"mime_type": "text/plain" # Can be "text/plain" or "text/html".
}
gcs_destination = {"output_uri_prefix": output_uri}
output_config = {"gcs_destination": gcs_destination}
parent = translate_client.location_path(project_id, location)
# Supported language codes: https://cloud.google.com/translate/docs/language
operation = translate_client.batch_translate_text(
parent=parent,
source_language_code="it",
target_language_codes=["en"], # Up to 10 language codes here.
input_configs=[input_configs_element],
output_config=output_config)
response = operation.result(180)
def removePunctuation(string):
# punctuation marks
punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
# traverse the given string and if any punctuation
# marks occur replace it with null
for x in string.lower():
if x in punctuations:
string = string.replace(x, "")
# Print string without punctuation
return string
This step will take 1hr and 20 min approx
customize_stop_words = [
'uoc', 'diagnostic', 'interventional', 'radiology', 'madonna', 'delle', 'grazie', 'hospital',
'Borgheresi', 'Agostini', 'Ottaviani', 'Floridi', 'Giovagnoni', 'di', 'specialization',
'Polytechnic', 'University', 'marche', 'ANCONA', 'Italy', 'Azienda', 'Ospedali',
'Riuniti', 'Yorrette', 'Matera', 'Michele', 'Nardella', 'Gerardo', 'Costanzo',
'Claudia', 'Lopez', 'st', 'a.', 'a', 'of', 's', 'cien', 'ze', 'diolog', 'ic', 'he',
'â', '€','s','b','case','Cuoladi','l','c','ra','bergamo','patelli','est','asst',
'dr','Dianluigi', 'Svizzero','i','riccardo','Alessandro','Spinazzola','angelo',
'maggiore', 'p' ,'r' ,'t', 'm', 'en', 't', 'o', 'd', 'e', 'n', 'd', 'o', 'g', 'h', 'u'
]
# Process documents
bucket_name = 'covid19-public-dataset-aketari'
gcs_source_prefix = 'pdf'
lst_pdf_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
prefix='pdf')
lst_json_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
prefix='json')
overall_start_time = time.time()
for blob in lst_pdf_blobs:
doc_title = blob.name.split('/')[-1].split('.pdf')[0]
#files_metadata[doc_title] = {}
# Generate all paths
gcs_source_path = 'gs://' + bucket_name +'/' + blob.name
#start_time = time.time()
# OCR pdf documents
async_detect_document(vision_client,
gcs_source_path,
json_gcs_dest_path)
print ('OCR done.')
for blob in lst_json_blobs:
doc_title = blob.name.split('/')[-1].split('-')[0]
json_gcs_dest_path = 'gs://' + bucket_name + '/{}'.format(blob.name)
txt_gcs_dest_path = 'gs://' + bucket_name + '/raw_txt/' + doc_title + '.txt'
eng_txt_gcs_dest_path = 'gs://' + bucket_name + '/eng_txt/{}/'.format(doc_title)
processed_eng_gcs_dest_path = 'gs://' + bucket_name + '/curated_eng_txt/' + doc_title + '.txt'
# Parse json
all_text = read_json_result(json_gcs_dest_path, doc_title)
#files_metadata[doc_title]['text'] = all_text
# Upload raw text to GCS
upload_blob(all_text, txt_gcs_dest_path)
# Translate raw text to english
batch_translate_text(input_uri = txt_gcs_dest_path,
output_uri = eng_txt_gcs_dest_path)
# Process eng raw text
blob_prefix = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title,
bucket_name,
doc_title)
eng_blob = storage_client.get_bucket(bucket_name).get_blob(blob_prefix)
eng_raw_string = eng_blob.download_as_string().decode('utf-8')
# lowercase
#sample_text = eng_raw_string.lower()
# Remove dates
# 1 or 2 digit number followed by back slash followed by 1 or 2 digit number ...
pattern_dates = '(\d{1,2})/(\d{1,2})/(\d{4})'
pattern_fig = 'Figure (\d{1,2})'
pattern_image = '^Image .$'
replace = ''
eng_raw_string = re.sub(pattern_dates, replace, eng_raw_string)
eng_raw_string = re.sub(pattern_fig, replace, eng_raw_string)
eng_raw_string = re.sub(pattern_image, replace, eng_raw_string)
# remove punctuation and special characters
eng_raw_string = re.sub('[^A-Za-z0-9]+', ' ', eng_raw_string)
# Remove custom stop words
tokens = [token for token in eng_raw_string.split()if token not in customize_stop_words]
refined_doc = ''
for word in tokens:
refined_doc += ' {}'.format(word)
# Upload raw text to GCS
upload_blob(refined_doc, processed_eng_gcs_dest_path)
#print('refinement completed')
print('{} processing is done.'.format(doc_title))
def bqCreateDataset(dataset_name):
dataset_ref = bq_client.dataset(dataset_name)
try:
return bq_client.get_dataset(dataset_ref).dataset_id
except:
dataset = bigquery.Dataset(dataset_ref)
dataset = bq_client.create_dataset(dataset)
print('Dataset {} created.'.format(dataset.dataset_id))
return dataset.dataset_id
def bqCreateTable(dataset_id,
table_name,):
"""
Create main table with all cases and the medical text.
return:
table_id
"""
dataset_ref = bq_client.dataset(dataset_id)
# Prepares a reference to the table
table_ref = dataset_ref.table(table_name)
try:
return bq_client.get_table(table_ref).table_id
except:
schema = [
bigquery.SchemaField('case', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('it_raw_txt', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('eng_raw_txt', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('eng_txt', 'STRING', mode='REQUIRED',
description='Output of preprocessing pipeline.')]
table = bigquery.Table(table_ref, schema=schema)
table = bq_client.create_table(table)
print('table {} created.'.format(table.table_id))
return table.table_id
def exportItems2BQ(dataset_id, table_id, case,
it_raw_blob, eng_raw_blob, curated_eng_blob):
# Prepares a reference to the dataset
dataset_ref = bq_client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = bq_client.get_table(table_ref) # API call
# Download text from GCS
it_raw_txt_string = it_raw_blob.download_as_string().decode('utf-8')
eng_raw_txt_string = eng_raw_blob.download_as_string().decode('utf-8')
curated_eng_string = curated_eng_blob.download_as_string().decode('utf-8')
rows_to_insert = [{'case': case,
'it_raw_txt': it_raw_txt_string,
'eng_raw_txt': eng_raw_txt_string,
'eng_txt': curated_eng_string
}]
errors = bq_client.insert_rows(table, rows_to_insert) # API request
assert errors == []
print('{} was added to {} dataset, specifically in {} table.'.format(case,
dataset_id,
table_id))
bucket_name = 'covid19-public-dataset-aketari'
gcs_source_prefix = 'raw_txt'
dataset_id = bqCreateDataset('covid19')
table_id = bqCreateTable(dataset_id, 'ISMIR_cases')
lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
prefix=gcs_source_prefix)
for blob in lst_blobs:
doc_title = blob.name.split('/')[-1].split('.txt')[0]
# download as string
# it_raw_txt = gs://bucket_name/
it_raw_blob = storage_client.get_bucket(bucket_name).get_blob('raw_txt/{}.txt'.format(doc_title))
# eng_raw_txt = gs://covid19-aziz/text/[...]covid19-aziz_text_raw_txt_{doc_title}_en_translations.txt
path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title,
bucket_name,
doc_title)
eng_raw_blob = storage_client.get_bucket(bucket_name).get_blob(path_blob_eng_raw)
# curated_eng_txt = gs://covid19-aziz/text/curated_eng_txt/case1.txt
curated_eng_blob = storage_client.get_bucket(bucket_name)\
.get_blob('curated_eng_txt/{}.txt'.format(doc_title))
# populate to BQ dataset
exportItems2BQ(dataset_id, table_id, doc_title,
it_raw_blob, eng_raw_blob, curated_eng_blob)
def returnQueryResults(bq_client, project_id, dataset_id, table_id, case_id):
"""
Args:
bq_client:
project_id:
dataset_id:
table_id:
case_id:
Returns:
"""
query = ('SELECT * FROM `{}.{}.{}` WHERE `case`="{}" LIMIT 1'.format(project_id, dataset_id, table_id, case_id))
try:
query_job = bq_client.query(query)
is_exist = len(list(query_job.result())) >= 1
logging.info('Query case id: {}'.format(case_id) if is_exist \
else "Case id: {} does NOT exist".format(case_id))
print (list(query_job.result()))
except Exception as e:
logging.error("Error", e)
returnQueryResults(bq_client, project_id, 'covid19', 'ISMIR_cases', 'case1')
# https://www.kdnuggets.com/2019/04/text-preprocessing-nlp-machine-learning.html
# Load model
# en_ner_bionlp13cg_md or en_core_sci_lg
#nlp = spacy.load("en_core_sci_lg")
nlp = en_core_sci_lg.load()
# Add pipe features to pipeline
linker = UmlsEntityLinker(resolve_abbreviations=True)
nlp.add_pipe(linker)
# Add the abbreviation pipe to the spacy pipeline.
abbreviation_pipe = AbbreviationDetector(nlp)
nlp.add_pipe(abbreviation_pipe)
def medicalEntityExtraction(doc):
# convert text to vector
display_text = displacy.render(doc,jupyter=True,style='ent')
annotated_entities = set([(X.text, X.label_) for X in doc.ents])
return display_text, annotated_entities
def addTask(client, entities_dict):
key = client.key('case', doc_title)
task = datastore.Entity(key=key)
task.update(
entities_dict)
client.put(task)
# Then get by key for this entity
return client.get(key)
# list of blobs
bucket_name = 'covid19-public-dataset-aketari'
gcs_source_prefix = 'curated_eng_txt'
lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
prefix=gcs_source_prefix)
for blob in lst_blobs:
doc_title = blob.name.split('/')[-1].split('.txt')[0]
# download as string
eng_string = blob.download_as_string().decode('utf-8')
# convert to vector
doc = nlp(eng_string)
# Extract medical entities
pattern = 'T(\d{3})'
UMLS_tuis_entity = {}
entity_dict = {}
for idx in range(len(doc.ents)):
entity = doc.ents[idx]
entity_dict[entity] = ''
for umls_ent in entity._.umls_ents:
entity_dict[entity] = linker.umls.cui_to_entity[umls_ent[0]]
tui = re.search(pattern, str(entity_dict[entity]))
if tui:
UMLS_tuis_entity[str(entity)] = tui.group()
else:
UMLS_tuis_entity[str(entity)] = None
# generate dataframes
entities = list(UMLS_tuis_entity.keys())
TUIs = list(UMLS_tuis_entity.values())
df_entities = pd.DataFrame(data={'entity':entities,'TUIs':TUIs})
df_reference_TUIs = pd.read_csv('./data/UMLS_tuis.csv')
df_annotated_text_entities = pd.merge(df_entities,df_reference_TUIs,how='inner',on=['TUIs'])
# upload entities to datastore
entities_dict = {}
for idx in range(df_annotated_text_entities.shape[0]):
category = df_annotated_text_entities.iloc[idx].values[2]
#TUI = df_annotated_text_entities.iloc[idx].values[1]
#entities_dict[category].append(TUI)
med_entity = df_annotated_text_entities.iloc[idx].values[0]
# Append to list of entities if the key,value pair already exist
try:
entities_dict[category].append(med_entity)
except:
entities_dict[category] = []
entities_dict[category].append(med_entity)
# API call
key = addTask(datastore_client, entities_dict)
print('The upload of {} entities is done.'.format(doc_title))
df_annotated_text_entities.head()
def getCases(datastore_client, filter_dict, limit=10):
query = datastore_client.query(kind='case')
for key,values in filter_dict.items():
for value in values:
query.add_filter(key, '=', value)
results = list(query.fetch(limit=limit))
return results
filter_dict = {'Sign or Symptom':['onset symptoms', "chills"]}
getCases(datastore_client,filter_dict)