Aziz Ketari hace 4 años
commit
e1f8d8e7c1
Se han modificado 16 ficheros con 967 adiciones y 0 borrados
  1. BIN
      .DS_Store
  2. 94 0
      README.md
  3. BIN
      data/.DS_Store
  4. 134 0
      data/UMLS_tuis.csv
  5. 2 0
      data/download_content.sh
  6. 7 0
      env_variables.sh
  7. 63 0
      extraction.py
  8. 88 0
      preprocessing.py
  9. 9 0
      requirements.txt
  10. 40 0
      retrieving.py
  11. 118 0
      storing.py
  12. 0 0
      utils/__init__.py
  13. 128 0
      utils/bq_fcn.py
  14. 9 0
      utils/config.py
  15. 120 0
      utils/ner_fcn.py
  16. 155 0
      utils/preprocessing_fcn.py

BIN
.DS_Store


+ 94 - 0
README.md

@@ -0,0 +1,94 @@
+# COVID-19 public dataset on GCP from cases in Italy
+> by Italian Society of Medical and Interventional Radiology (ISMIR)
+
+This repository contains all the code required to extract relevant information from pdf documents published by ISMIR and store raw data in  a relational database and entities in a No-SQL database.
+
+In particular, you will use Google Cloud Vision API and Translation API, before storing the information on BigQuery API. Separately, you will also use specific NER models (from Scispacy) to extract (medical) domain specific entities and store them in a NoSQL db (namely Datastore) on Google Cloud Platform.
+
+Looking for more context behind this dataset? Check out this article.
+
+---
+
+## Installation
+**Requirements:**
+- Clone this repo to your local machine using https://github.com/
+- You need a Google Cloud project and IAM rights to create service accounts.
+- Enable APIs
+```
+gcloud services enable vision.googleapis.com
+gcloud services enable translate.googleapis.com
+gcloud services enable datastore.googleapis.com
+gcloud services enable bigquery.googleapis.com
+```
+
+- Install package requirements:
+
+```pip install -r requirements.txt```
+
+
+Note:
+
+You will also need to download a NER model for the second part of this pipeline. See Scispacy full selection of available models [here]('https://github.com/allenai/scispacy'). If you follow this installation guide, the steps will automatically download a model for you and install it.
+
+
+## Extracting data
+- **Step 1:** Assign the values to each variables in env_variables.sh file
+
+```
+cd ~/covid19
+./env_variables.sh
+```
+
+- **Step 2:** Download the required files to your bucket and load the required model in your local  (this step will take ~10 min)
+
+```
+sh ~/data/download_content.sh
+pip install -U ./scispacy_models/en_core_sci_lg-0.2.4.tar.gz
+```
+
+- **Step 3:** Start the extraction of text from the pdf documents  
+
+`python3 extraction.py`
+
+## Pre-processing data
+Following the extraction of text, it's time to translate it from Italian to English and curate it.
+
+`python3 preprocessing.py`
+
+## Storing data
+Following the pre-processing, it's time to store the data in a more searchable format: a data warehouse - [BigQuery](https://cloud.google.com/bigquery) - for the text, and a No-SQL database - [Datastore](https://cloud.google.com/datastore) - for the (UMLS) medical entities. 
+
+`python3 storing.py`
+
+## Test
+Last but not least, you can query your databases using this script.
+
+`python3 retrieving.py`
+
+## Contributing
+> To get started...
+
+### Step 1
+- **Option 1**
+    - 🍴 Fork this repo!    
+
+- **Option 2**
+    - 👯 Clone this repo to your local machine using https://github.com/joanaz/HireDot2.git
+    
+### Step 2
+- **HACK AWAY!** 🔨🔨🔨
+
+### Step 3
+- 🔃 Create a new pull request using https://github.com/joanaz/HireDot2/compare/.
+
+---
+
+### Citing
+
+- [ScispaCy: Fast and Robust Models for Biomedical Natural Language Processing by Mark Neumann and Daniel King and Iz Beltagy and Waleed Ammar
+  (2019)](https://www.semanticscholar.org/paper/ScispaCy%3A-Fast-and-Robust-Models-for-Biomedical-Neumann-King/de28ec1d7bd38c8fc4e8ac59b6133800818b4e29)
+  
+## License
+[![License](http://img.shields.io/:license-mit-blue.svg?style=flat-square)](http://badges.mit-license.org)
+
+- [MIT License](https://opensource.org/licenses/mit-license.php)

BIN
data/.DS_Store


+ 134 - 0
data/UMLS_tuis.csv

@@ -0,0 +1,134 @@
+TUIs,Categories
+T116,"Amino Acid, Peptide, or Protein"
+T020,Acquired Abnormality
+T052,Activity
+T100,Age Group
+T087,Amino Acid Sequence
+T011,Amphibian
+T190,Anatomical Abnormality
+T008,Animal
+T017,Anatomical Structure
+T195,Antibiotic
+T194,Archaeon
+T123,Biologically Active Substance
+T007,Bacterium
+T031,Body Substance
+T022,Body System
+T053,Behavior
+T038,Biologic Function
+T012,Bird
+T029,Body Location or Region
+T091,Biomedical Occupation or Discipline
+T122,Biomedical or Dental Material
+T023,"Body Part, Organ, or Organ Component"
+T030,Body Space or Junction
+T118,Carbohydrate
+T026,Cell Component
+T043,Cell Function
+T025,Cell
+T019,Congenital Abnormality
+T103,Chemical
+T120,Chemical Viewed Functionally
+T104,Chemical Viewed Structurally
+T185,Classification
+T201,Clinical Attribute
+T200,Clinical Drug
+T077,Conceptual Entity
+T049,Cell or Molecular Dysfunction
+T088,Carbohydrate Sequence
+T060,Diagnostic Procedure
+T056,Daily or Recreational Activity
+T203,Drug Delivery Device
+T047,Disease or Syndrome
+T065,Educational Activity
+T069,Environmental Effect of Humans
+T111,Eicosanoid
+T196,"Element, Ion, or Isotope"
+T050,Experimental Model of Disease
+T018,Embryonic Structure
+T071,Entity
+T126,Enzyme
+T204,Eukaryote
+T051,Event
+T099,Family Group
+T021,Fully Formed Anatomical Structure
+T013,Fish
+T033,Finding
+T004,Fungus
+T168,Food
+T169,Functional Concept
+T045,Genetic Function
+T083,Geographic Area
+T028,Gene or Genome
+T064,Governmental or Regulatory Activity
+T102,Group Attribute
+T096,Group
+T068,Human-caused Phenomenon or Process
+T093,Health Care Related Organization
+T058,Health Care Activity
+T131,Hazardous or Poisonous Substance
+T125,Hormone
+T016,Human
+T078,Idea or Concept
+T129,Immunologic Factor
+T055,Individual Behavior
+T197,Inorganic Chemical
+T037,Injury or Poisoning
+T170,Intellectual Product
+T130,"Indicator, Reagent, or Diagnostic Aid"
+T171,Language
+T059,Laboratory Procedure
+T034,Laboratory or Test Result
+T119,Lipid
+T015,Mammal
+T063,Molecular Biology Research Technique
+T066,Machine Activity
+T074,Medical Device
+T041,Mental Process
+T073,Manufactured Object
+T048,Mental or Behavioral Dysfunction
+T044,Molecular Function
+T085,Molecular Sequence
+T191,Neoplastic Process
+T114,"Nucleic Acid, Nucleoside, or Nucleotide"
+T070,Natural Phenomenon or Process
+T124,Neuroreactive Substance or Biogenic Amine
+T086,Nucleotide Sequence
+T057,Occupational Activity
+T090,Occupation or Discipline
+T115,Organophosphorus Compound
+T109,Organic Chemical
+T032,Organism Attribute
+T040,Organism Function
+T001,Organism
+T092,Organization
+T042,Organ or Tissue Function
+T046,Pathologic Function
+T072,Physical Object
+T067,Phenomenon or Process
+T039,Physiologic Function
+T121,Pharmacologic Substance
+T002,Plant
+T101,Patient or Disabled Group
+T098,Population Group
+T097,Professional or Occupational Group
+T094,Professional Society
+T080,Qualitative Concept
+T081,Quantitative Concept
+T192,Receptor
+T014,Reptile
+T062,Research Activity
+T075,Research Device
+T089,Regulation or Law
+T167,Substance
+T095,Self-help or Relief Organization
+T054,Social Behavior
+T184,Sign or Symptom
+T082,Spatial Concept
+T110,Steroid
+T024,Tissue
+T079,Temporal Concept
+T061,Therapeutic or Preventive Procedure
+T005,Virus
+T127,Vitamin
+T010,Vertebrate

+ 2 - 0
data/download_content.sh

@@ -0,0 +1,2 @@
+gsutil -m cp -r gs://covid19-public-dataset-aketari/pdf/* gs://$BUCKET_NAME/pdf/
+gsutil -m cp -r gs://covid19-public-dataset-aketari/scispacy_models/en_core_sci_lg-0.2.4.tar.gz ./scispacy_models/

+ 7 - 0
env_variables.sh

@@ -0,0 +1,7 @@
+SA_KEY_PATH="path/to/service_account.json",
+PROJECT_ID="unique_project_id",
+BUCKET_NAME="bucket_contains_data",
+LOCATION="compute_region",
+BQ_DATASET_NAME="covid19",
+BQ_TABLE_NAME="ISMIR",
+TEST_CASE="case14" # lowercase any case from 1 to 49 (e.g case1, case32 ...)

+ 63 - 0
extraction.py

@@ -0,0 +1,63 @@
+from google.cloud import storage, vision
+from google.oauth2 import service_account
+from utils.preprocessing_fcn import async_detect_document, read_json_result, upload_blob
+
+import logging
+import time
+import os
+
+project_id = os.getenv('PROJECT_ID')
+bucket_name = os.getenv('BUCKET_NAME')
+location = os.getenv('LOCATION')
+key_path = os.getenv('SA_KEY_PATH')
+
+credentials = service_account.Credentials.from_service_account_file(key_path)
+
+storage_client = storage.Client(credentials=credentials,
+                                project_id=project_id)
+
+vision_client = vision.Client(credentials=credentials,
+                              project_id=project_id)
+
+lst_pdf_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
+                                          prefix='pdf')
+
+lst_json_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
+                                           prefix='json')
+
+start_time = time.time()
+nbr_documents = len(lst_pdf_blobs)
+for blob in lst_pdf_blobs:
+    doc_title = blob.name.split('/')[-1].split('.pdf')[0]
+
+    # Generate all paths
+    gcs_source_path = 'gs://' + bucket_name + '/' + blob.name
+    json_gcs_dest_path = 'gs://' + bucket_name + '/json/' + blob.name
+
+    # OCR pdf documents
+    async_detect_document(vision_client,
+                          gcs_source_path,
+                          json_gcs_dest_path)
+
+total_time = time.time() - start_time
+logging.info("Vision API successfully completed OCR of all {} documents on {} minutes".format(nbr_documents,
+                                                                                              round(total_time / 60,
+                                                                                                    1)))
+
+start_time = time.time()
+for blob in lst_json_blobs:
+    doc_title = blob.name.split('/')[-1].split('-')[0]
+
+    # Define GCS paths
+    json_gcs_dest_path = 'gs://' + bucket_name + '/{}'.format(blob.name)
+    txt_gcs_dest_path = 'gs://' + bucket_name + '/raw_txt/' + doc_title + '.txt'
+
+    # Parse json
+    all_text = read_json_result(json_gcs_dest_path, doc_title)
+
+    # Upload raw text to GCS
+    upload_blob(all_text, txt_gcs_dest_path)
+
+total_time = time.time() - start_time
+logging.info(
+    'Vision API successfully completed the OCR of all {} documents on {} minutes'.format(round(total_time / 60, 1)))

+ 88 - 0
preprocessing.py

@@ -0,0 +1,88 @@
+from google.cloud import storage
+from google.oauth2 import service_account
+from utils.preprocessing_fcn import batch_translate_text, upload_blob
+import logging
+
+import re
+import time
+import os
+
+project_id = os.getenv('PROJECT_ID')
+bucket_name = os.getenv('BUCKET_NAME')
+location = os.getenv('LOCATION')
+key_path = os.getenv('SA_KEY_PATH')
+
+credentials = service_account.Credentials.from_service_account_file(key_path)
+
+storage_client = storage.Client(credentials=credentials,
+                                project_id=project_id)
+
+lst_json_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
+                                           prefix='json')
+
+customize_stop_words = [
+    'uoc', 'diagnostic', 'interventional', 'radiology', 'madonna', 'delle', 'grazie', 'hospital',
+    'Borgheresi', 'Agostini', 'Ottaviani', 'Floridi', 'Giovagnoni', 'di', 'specialization',
+    'Polytechnic', 'University', 'marche', 'ANCONA', 'Italy', 'Azienda', 'Ospedali',
+    'Riuniti', 'Yorrette', 'Matera', 'Michele', 'Nardella', 'Gerardo', 'Costanzo',
+    'Claudia', 'Lopez', 'st', 'a.', 'a', 'of', 's', 'cien', 'ze', 'diolog', 'ic', 'he',
+    'â', '€', 's', 'b', 'case', 'Cuoladi', 'l', 'c', 'ra', 'bergamo', 'patelli', 'est', 'asst',
+    'dr', 'Dianluigi', 'Svizzero', 'i', 'riccardo', 'Alessandro', 'Spinazzola', 'angelo',
+    'maggiore', 'p', 'r', 't', 'm', 'en', 't', 'o', 'd', 'e', 'n', 'd', 'o', 'g', 'h', 'u'
+]
+
+start_time = time.time()
+for blob in lst_json_blobs:
+    doc_title = blob.name.split('/')[-1].split('-')[0]
+
+    txt_gcs_dest_path = 'gs://' + bucket_name + '/raw_txt/' + doc_title + '.txt'
+    eng_txt_gcs_dest_path = 'gs://' + bucket_name + '/eng_txt/{}/'.format(doc_title)
+    processed_eng_gcs_dest_path = 'gs://' + bucket_name + '/curated_eng_txt/' + doc_title + '.txt'
+
+    # Translate raw text to english
+    try:
+        batch_translate_text(project_id=project_id,
+                             location=location,
+                             input_uri=txt_gcs_dest_path,
+                             output_uri=eng_txt_gcs_dest_path)
+        logging.info("Translation of {} document was successful.".format(doc_title))
+    except Exception, e:
+        logging.error("Error", e)
+
+    # Process eng raw text
+    blob_prefix = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title,
+                                                                        bucket_name,
+                                                                        doc_title)
+
+    eng_blob = storage_client.get_bucket(bucket_name).get_blob(blob_prefix)
+    eng_raw_string = eng_blob.download_as_string().decode('utf-8')
+
+    # Remove dates
+    # 1 or 2 digit number followed by back slash followed by 1 or 2 digit number ...
+    pattern_dates = '(\d{1,2})/(\d{1,2})/(\d{4})'
+    pattern_fig = 'Figure (\d{1,2})'
+    pattern_image = '^Image .$'
+    replace = ''
+
+    eng_raw_string = re.sub(pattern_dates, replace, eng_raw_string)
+    eng_raw_string = re.sub(pattern_fig, replace, eng_raw_string)
+    eng_raw_string = re.sub(pattern_image, replace, eng_raw_string)
+
+    # remove punctuation and special characters
+    eng_raw_string = re.sub('[^A-Za-z0-9]+', ' ', eng_raw_string)
+
+    # Remove custom stop words
+    tokens = [token for token in eng_raw_string.split() if token not in customize_stop_words]
+
+    refined_doc = ''
+    for word in tokens:
+        refined_doc += ' {}'.format(word)
+
+    # Upload raw text to GCS
+    upload_blob(refined_doc, processed_eng_gcs_dest_path)
+    logging.info("The curation of text in {} completed successfully.".format(doc_title))
+
+total_time = time.time() - start_time
+logging.info('The translation and curation of all documents was completed successfully in {} minutes.'.format(
+    round(total_time / 60, 1)))
+

+ 9 - 0
requirements.txt

@@ -0,0 +1,9 @@
+google-api-core==1.16.0
+google-api-python-client==1.7.11
+google-cloud-bigquery==1.24.0
+google-cloud-datastore==1.11.0
+google-cloud-translate==2.0.1
+google-cloud-vision==1.0.0
+googleapis-common-protos==1.51.0
+pandas==1.0.1
+scispacy==0.2.4

+ 40 - 0
retrieving.py

@@ -0,0 +1,40 @@
+from google.cloud import storage, bigquery, datastore
+from google.oauth2 import service_account
+from utils.bq_fcn import returnQueryResults
+from utils.ner_fcn import getCases
+
+import logging
+import os
+
+project_id = os.getenv('PROJECT_ID')
+bucket_name = os.getenv('BUCKET_NAME')
+location = os.getenv('LOCATION')
+key_path = os.getenv('SA_KEY_PATH')
+dataset_name = os.getenv('BQ_DATASET_NAME')
+table_name = os.getenv('BQ_TABLE_NAME')
+case_id = os.getenv('TEST_CASE')
+
+credentials = service_account.Credentials.from_service_account_file(key_path)
+
+bq_client = bigquery.Client(credentials=credentials,
+                            project_id=project_id)
+
+datastore_client = datastore.Client(credentials=credentials,
+                                    project_id=project_id)
+
+# Returns a list of results
+try:
+    results_lst = returnQueryResults(bq_client, project_id, dataset_name, table_name, case_id)
+    logging.info("Here is the result of the test query: \n {}".format(results_lst))
+except Exception, e:
+    logging.error("Error", e)
+
+try:
+    filter_dict = {'Sign or Symptom':['onset symptoms', "chills"]}
+    results = getCases(datastore_client, filter_dict, limit=10)
+    logging.info("Here is the result of the test query: \n {}".format(results))
+except Exception, e:
+    logging.error("Error", e)
+
+
+

+ 118 - 0
storing.py

@@ -0,0 +1,118 @@
+from google.cloud import storage, bigquery, datastore
+from google.oauth2 import service_account
+from utils.bq_fcn import bqCreateDataset, bqCreateTable, exportItems2BQ
+from utils.ner_fcn import loadModel, addTask, extractMedEntities
+import en_core_sci_lg
+
+import logging
+import re
+import time
+import os
+import pandas as pd
+import sys
+import argparse
+
+project_id = os.getenv('PROJECT_ID')
+bucket_name = os.getenv('BUCKET_NAME')
+location = os.getenv('LOCATION')
+key_path = os.getenv('SA_KEY_PATH')
+dataset_name = os.getenv('BQ_DATASET_NAME')
+table_name = os.getenv('BQ_TABLE_NAME')
+
+credentials = service_account.Credentials.from_service_account_file(key_path)
+
+storage_client = storage.Client(credentials=credentials,
+                                project_id=project_id)
+
+datastore_client = datastore.Client(credentials=credentials,
+                                    project_id=project_id)
+
+gcs_source_prefix = 'raw_txt'
+lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
+                                      prefix=gcs_source_prefix)
+
+start_time = time.time()
+
+try:
+    dataset_id = bqCreateDataset(dataset_name)
+    logging.info("The following dataset {} was successfully created/retrieved.".format(dataset_name))
+except Exception as e:
+    logging.error("An error occurred.", e)
+
+try:
+    table_id = bqCreateTable(dataset_id, table_name)
+    logging.info("The following table {} was successfully created/retrieved.".format(table_name))
+except Exception as e:
+    logging.error("An error occurred.", e)
+
+for blob in lst_blobs:
+    doc_title = blob.name.split('/')[-1].split('.txt')[0]
+
+    # download as string
+    it_raw_blob = storage_client.get_bucket(bucket_name).get_blob('raw_txt/{}.txt'.format(doc_title))
+
+    # set the GCS path
+    path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title,
+                                                                              bucket_name,
+                                                                              doc_title)
+    eng_raw_blob = storage_client.get_bucket(bucket_name).get_blob(path_blob_eng_raw)
+
+    # Upload blob of interest
+    curated_eng_blob = storage_client.get_bucket(bucket_name) \
+        .get_blob('curated_eng_txt/{}.txt'.format(doc_title))
+
+    # populate to BQ dataset
+    exportItems2BQ(dataset_id, table_id, doc_title, it_raw_blob, eng_raw_blob, curated_eng_blob)
+
+total_time = time.time() - start_time
+logging.info('The export to BigQuery was completed successfully and took {} minutes.'.format(round(total_time / 60, 1)))
+
+curated_gcs_source_prefix = 'curated_eng_txt'
+lst_curated_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
+                                              prefix=curated_gcs_source_prefix)
+
+nlp = loadModel(model=en_core_sci_lg)
+
+start_time = time.time()
+for blob in lst_curated_blobs:
+    doc_title = blob.name.split('/')[-1].split('.txt')[0]
+
+    # download as string
+    eng_string = blob.download_as_string().decode('utf-8')
+
+    # convert to vector
+    doc = nlp(eng_string)
+
+    # Extract medical entities
+    UMLS_tuis_entity = extractMedEntities(doc)
+
+    # Generate dataframes
+    entities = list(UMLS_tuis_entity.keys())
+    TUIs = list(UMLS_tuis_entity.values())
+    df_entities = pd.DataFrame(data={'entity': entities, 'TUIs': TUIs})
+    df_reference_TUIs = pd.read_csv('~/data/UMLS_tuis.csv')
+    df_annotated_text_entities = pd.merge(df_entities, df_reference_TUIs, how='inner', on=['TUIs'])
+
+    # Upload entities to datastore
+    entities_dict = {}
+    for idx in range(df_annotated_text_entities.shape[0]):
+        category = df_annotated_text_entities.iloc[idx].values[2]
+        med_entity = df_annotated_text_entities.iloc[idx].values[0]
+
+        # Append to list of entities if the key,value pair already exist
+        try:
+            entities_dict[category].append(med_entity)
+        except:
+            entities_dict[category] = []
+            entities_dict[category].append(med_entity)
+
+        # API call
+    key = addTask(datastore_client, doc_title, entities_dict)
+    logging.info('The upload of {} entities is done.'.format(doc_title))
+
+total_time = time.time() - start_time
+logging.info(
+    "The export to Datastore was completed successfully and took {} minutes.".format(round(total_time / 60, 1)))
+
+
+

+ 0 - 0
utils/__init__.py


+ 128 - 0
utils/bq_fcn.py

@@ -0,0 +1,128 @@
+from google.cloud import bigquery
+from google.oauth2 import service_account
+import logging
+import os
+
+project_id = os.getenv('PROJECT_ID')
+bucket_name = os.getenv('BUCKET_NAME')
+location = os.getenv('LOCATION')
+key_path = os.getenv('SA_KEY_PATH')
+
+credentials = service_account.Credentials.from_service_account_file(key_path)
+
+bq_client = bigquery.Client(credentials=credentials,
+                            project_id=project_id)
+
+
+def bqCreateDataset(dataset_name):
+    """
+    Creates a dataset on Google Cloud Platform.
+    Args:
+        dataset_name: str - Name of the dataset
+    Returns:
+        dataset_id: str - Reference id for the dataset just created
+    """
+    dataset_ref = bq_client.dataset(dataset_name)
+
+    try:
+        dataset_id = bq_client.get_dataset(dataset_ref).dataset_id
+        logging.warning('This dataset name: {} is already used.'.format(dataset_id))
+        return dataset_id
+    except:
+        dataset = bigquery.Dataset(dataset_ref)
+        dataset = bq_client.create_dataset(dataset)
+        logging.info('Dataset {} created.'.format(dataset.dataset_id))
+        return dataset.dataset_id
+
+
+def bqCreateTable(dataset_id, table_name):
+    """
+    Create main table with all cases and the medical text.
+    Args:
+        dataset_id: str - Reference id for the dataset to use
+        table_name: str - Name of the table to create
+
+    Returns:
+        table_id: str - Reference id for the table just created
+    """
+    dataset_ref = bq_client.dataset(dataset_id)
+
+    # Prepares a reference to the table
+    table_ref = dataset_ref.table(table_name)
+
+    try:
+        return bq_client.get_table(table_ref).table_id
+    except:
+        schema = [
+            bigquery.SchemaField('case', 'STRING', mode='REQUIRED'),
+            bigquery.SchemaField('it_raw_txt', 'STRING', mode='REQUIRED'),
+            bigquery.SchemaField('eng_raw_txt', 'STRING', mode='REQUIRED'),
+            bigquery.SchemaField('eng_txt', 'STRING', mode='REQUIRED',
+                                 description='Output of preprocessing pipeline.')]
+        table = bigquery.Table(table_ref, schema=schema)
+        table = bq_client.create_table(table)
+        logging.info('table {} has been created.'.format(table.table_id))
+        return table.table_id
+
+
+def exportItems2BQ(dataset_id, table_id, case, it_raw_blob, eng_raw_blob, curated_eng_blob):
+    """
+    Export text data to BigQuery.
+    Args:
+        dataset_id:
+        table_id:
+        case:
+        it_raw_blob:
+        eng_raw_blob:
+        curated_eng_blob:
+
+    Returns:
+
+    """
+    # Prepares a reference to the dataset
+    dataset_ref = bq_client.dataset(dataset_id)
+
+    table_ref = dataset_ref.table(table_id)
+    table = bq_client.get_table(table_ref)  # API call
+
+    # Download text from GCS
+    it_raw_txt_string = it_raw_blob.download_as_string().decode('utf-8')
+    eng_raw_txt_string = eng_raw_blob.download_as_string().decode('utf-8')
+    curated_eng_string = curated_eng_blob.download_as_string().decode('utf-8')
+
+    rows_to_insert = [{'case': case,
+                       'it_raw_txt': it_raw_txt_string,
+                       'eng_raw_txt': eng_raw_txt_string,
+                       'eng_txt': curated_eng_string
+                       }]
+    errors = bq_client.insert_rows(table, rows_to_insert)  # API request
+    assert errors == []
+    logging.info('{} was added to {} dataset, specifically in {} table.'.format(case,
+                                                                                dataset_id,
+                                                                                table_id))
+
+
+def returnQueryResults(bq_client, project_id, dataset_id, table_id, case_id):
+    """
+    Get results from a BigQuery query.
+    Args:
+        bq_client:
+        project_id:
+        dataset_id:
+        table_id:
+        case_id:
+
+    Returns:
+
+    """
+
+    query = ('SELECT * FROM `{}.{}.{}` WHERE `case`="{}" LIMIT 1'.format(project_id, dataset_id, table_id, case_id))
+
+    try:
+        query_job = bq_client.query(query)
+        is_exist = len(list(query_job.result())) >= 1
+        logging.info('Query case id: {}'.format(case_id) if is_exist \
+                         else "Case id: {} does NOT exist".format(case_id))
+        print (list(query_job.result()))
+    except Exception as e:
+        logging.error("Error", e)

+ 9 - 0
utils/config.py

@@ -0,0 +1,9 @@
+service_account = {
+    'key_path': "path/to/service_account.json",
+}
+
+project_details = {
+    'project_id': 'unique_project_id',
+    'bucket_name': 'bucket_contains_data',
+    'location': 'compute_region'
+}

+ 120 - 0
utils/ner_fcn.py

@@ -0,0 +1,120 @@
+from google.cloud import datastore
+from google.oauth2 import service_account
+import logging
+import re
+import os
+
+import en_core_sci_sm, en_core_sci_lg, en_ner_bionlp13cg_md
+from scispacy.umls_linking import UmlsEntityLinker
+from scispacy.abbreviation import AbbreviationDetector
+
+
+# DEVELOPER: change path to key
+project_id = os.getenv('PROJECT_ID')
+bucket_name = os.getenv('BUCKET_NAME')
+location = os.getenv('LOCATION')
+key_path = os.getenv('SA_KEY_PATH')
+
+credentials = service_account.Credentials.from_service_account_file(key_path)
+
+datastore_client = datastore.Client(credentials=credentials,
+                                    project_id=credentials.project_id)
+
+
+def loadModel(model=en_core_sci_lg):
+    """
+    Loading Named Entity Recognition model.
+    Args:
+        model: options: en_core_sci_sm, en_core_sci_lg, en_ner_bionlp13cg_md
+
+    Returns:
+        nlp: loaded model
+    """
+    # Load the model
+    nlp = model.load()
+
+    # Add pipe features to pipeline
+    linker = UmlsEntityLinker(resolve_abbreviations=True)
+    nlp.add_pipe(linker)
+
+    # Add the abbreviation pipe to the spacy pipeline.
+    abbreviation_pipe = AbbreviationDetector(nlp)
+    nlp.add_pipe(abbreviation_pipe)
+    logging.info("Model and add-ons successfully loaded.")
+    return nlp
+
+
+def extractMedEntities(vectorized_doc):
+    """
+    Returns UMLS entities contained in a text.
+    Args:
+        vectorized_doc:
+
+    Returns:
+        UMLS_tuis_entity: dict - key: entity and value: TUI code
+    """
+    # Pattern for TUI code
+    pattern = 'T(\d{3})'
+
+    UMLS_tuis_entity = {}
+    entity_dict = {}
+
+    linker = UmlsEntityLinker(resolve_abbreviations=True)
+
+    for idx in range(len(vectorized_doc.ents)):
+        entity = vectorized_doc.ents[idx]
+        entity_dict[entity] = ''
+        for umls_ent in entity._.umls_ents:
+            entity_dict[entity] = linker.umls.cui_to_entity[umls_ent[0]]
+
+        # RegEx expression if contains TUI code
+        tui = re.search(pattern, str(entity_dict[entity]))
+        if tui:
+            UMLS_tuis_entity[str(entity)] = tui.group()
+        else:
+            UMLS_tuis_entity[str(entity)] = None
+
+    return UMLS_tuis_entity
+
+
+def addTask(client, doc_title, entities_dict):
+    """
+    Upload entities to Datastore.
+    Args:
+        client:
+        doc_title:
+        entities_dict:
+
+    Returns:
+        Datastore key object.
+    """
+    key = client.key('case', doc_title)
+    task = datastore.Entity(key=key)
+    task.update(
+        entities_dict
+    )
+    client.put(task)
+    # Then get by key for this entity
+    logging.info("Uploaded {} to Datastore.".format(doc_title))
+    return client.get(key)
+
+
+def getCases(datastore_client, filter_dict, limit=10):
+    """
+    Get results of query with custom filters
+    Args:
+        datastore_client: Client object
+        filter_dict: dict - e.g {parameter_A: [entity_name_A, entity_name_B],
+                                parameter_B: [entitiy_name_C]
+                                }
+        limit: int - result limits per default 10
+    Returns:
+        results: list - query results
+    """
+    query = datastore_client.query(kind='case')
+
+    for key, values in filter_dict.items():
+        for value in values:
+            query.add_filter(key, '=', value)
+    results = list(query.fetch(limit=limit))
+    return results

+ 155 - 0
utils/preprocessing_fcn.py

@@ -0,0 +1,155 @@
+from google.cloud import storage, translate, vision
+from google.oauth2 import service_account
+import logging
+import os
+
+from google.protobuf import json_format
+
+# DEVELOPER: change path to key
+project_id = os.getenv('PROJECT_ID')
+bucket_name = os.getenv('BUCKET_NAME')
+location = os.getenv('LOCATION')
+key_path = os.getenv('SA_KEY_PATH')
+
+# DEVELOPER: change path to key
+credentials = service_account.Credentials.from_service_account_file(key_path)
+
+storage_client = storage.Client(credentials=credentials,
+                                project_id=credentials.project_id)
+
+translate_client = translate.Client(credentials=credentials,
+                                    project_id=credentials.project_id)
+
+vision_client = vision.Client(credentials=credentials,
+                              project_id=credentials.project_id)
+
+
+def async_detect_document(vision_client, gcs_source_uri, gcs_destination_uri, batch_size=20):
+    """
+    OCR with PDF/TIFF as source files on GCS
+    Args:
+        vision_client:
+        gcs_source_uri:
+        gcs_destination_uri:
+        batch_size: How many pages should be grouped into each json output file.
+
+    Returns:
+
+    """
+    doc_title = gcs_source_uri.split('/')[-1].split('.pdf')[0]
+
+    # Supported mime_types are: 'application/pdf' and 'image/tiff'
+    mime_type = 'application/pdf'
+
+    # Feature in vision API
+    feature = vision.types.Feature(
+        type=vision.enums.Feature.Type.DOCUMENT_TEXT_DETECTION)
+
+    gcs_source = vision.types.GcsSource(uri=gcs_source_uri)
+    input_config = vision.types.InputConfig(
+        gcs_source=gcs_source, mime_type=mime_type)
+
+    gcs_destination = vision.types.GcsDestination(uri=gcs_destination_uri)
+    output_config = vision.types.OutputConfig(
+        gcs_destination=gcs_destination, batch_size=batch_size)
+
+    async_request = vision.types.AsyncAnnotateFileRequest(
+        features=[feature], input_config=input_config,
+        output_config=output_config)
+
+    operation = vision_client.async_batch_annotate_files(
+        requests=[async_request])
+
+    # print('Waiting for the operation to finish.')
+    operation.result(timeout=180)
+    logging.info('Text extraction from document {} is completed.'.format(doc_title))
+
+
+def read_json_result(bucket_name, doc_title):
+    """
+    Parsing the json files and extract text.
+    Args:
+        bucket_name:
+        doc_title:
+
+    Returns:
+        all_text: str - Containing all text of the document
+    """
+    gcs_destination_prefix = 'json/' + '{}-'.format(doc_title)
+
+    # List objects with the given prefix.
+    blob_list = list(storage_client.list_blobs(bucket_or_name=bucket_name,
+                                               prefix=gcs_destination_prefix))
+    all_text = ''
+    for blob in blob_list:
+
+        json_string = blob.download_as_string()
+        response = json_format.Parse(
+            json_string, vision.types.AnnotateFileResponse())
+
+        # The actual response for the first page of the input file.
+        for response in response.responses:
+            # first_page_response = response.responses[0]
+            text_response = response.full_text_annotation.text
+            all_text += text_response
+            all_text += ' '
+
+    logging.info("Parsing of {} json doc was successful.".format(doc_title))
+    return all_text
+
+
+def upload_blob(bucket_name, txt_content, destination_blob_name):
+    """
+    Uploads a file to the bucket.
+    Args:
+        bucket_name:
+        txt_content:
+        destination_blob_name:
+
+    Returns:
+
+    """
+    destination_blob_name = destination_blob_name.split('gs://{}/'.format(bucket_name))[-1]
+    bucket = storage_client.bucket(bucket_name)
+    blob = bucket.blob(destination_blob_name)
+
+    blob.upload_from_string(txt_content)
+
+    print("Text uploaded to {}".format(destination_blob_name))
+
+
+def batch_translate_text(project_id, location,
+                         input_uri="gs://YOUR_BUCKET_ID/path/to/your/file.txt",
+                         output_uri="gs://YOUR_BUCKET_ID/path/to/save/results/"):
+    """
+    Translates a batch of texts on GCS and stores the result in a GCS location.
+    Args:
+        project_id:
+        location:
+        input_uri:
+        output_uri:
+
+    Returns:
+
+    """
+
+    # Supported file types: https://cloud.google.com/translate/docs/supported-formats
+    gcs_source = {"input_uri": input_uri}
+
+    input_configs_element = {
+        "gcs_source": gcs_source,
+        "mime_type": "text/plain"  # Can be "text/plain" or "text/html".
+    }
+    gcs_destination = {"output_uri_prefix": output_uri}
+    output_config = {"gcs_destination": gcs_destination}
+    parent = translate_client.location_path(project_id, location)
+
+    # Supported language codes: https://cloud.google.com/translate/docs/language
+    operation = translate_client.batch_translate_text(
+        parent=parent,
+        source_language_code="it",
+        target_language_codes=["en"],  # Up to 10 language codes here.
+        input_configs=[input_configs_element],
+        output_config=output_config)
+
+    response = operation.result(180)