bq_fcn.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. from google.cloud import bigquery
  2. import os
  3. import logging
  4. def bqCreateDataset(bq_client, dataset_name):
  5. """
  6. Creates a dataset on Google Cloud Platform.
  7. Args:
  8. bq_client - BigQuery client instantiation -
  9. dataset_name: str - Name of the dataset
  10. Returns:
  11. dataset_id: str - Reference id for the dataset just created
  12. """
  13. dataset_ref = bq_client.dataset(dataset_name)
  14. try:
  15. dataset_id = bq_client.get_dataset(dataset_ref).dataset_id
  16. logging.warning('This dataset name: {} is already used.'.format(dataset_id))
  17. return dataset_id
  18. except:
  19. dataset = bigquery.Dataset(dataset_ref)
  20. dataset = bq_client.create_dataset(dataset)
  21. logging.info('Dataset {} created.'.format(dataset.dataset_id))
  22. return dataset.dataset_id
  23. def bqCreateTable(bq_client, dataset_id, table_name):
  24. """
  25. Create main table with all cases and the medical text.
  26. Args:
  27. bq_client: BigQuery client instantiation -
  28. dataset_id: str - Reference id for the dataset to use
  29. table_name: str - Name of the table to create
  30. Returns:
  31. table_id: str - Reference id for the table just created
  32. """
  33. dataset_ref = bq_client.dataset(dataset_id)
  34. # Prepares a reference to the table
  35. table_ref = dataset_ref.table(table_name)
  36. try:
  37. return bq_client.get_table(table_ref).table_id
  38. except:
  39. schema = [
  40. bigquery.SchemaField('case', 'STRING', mode='REQUIRED'),
  41. bigquery.SchemaField('it_raw_txt', 'STRING', mode='REQUIRED'),
  42. bigquery.SchemaField('eng_raw_txt', 'STRING', mode='REQUIRED'),
  43. bigquery.SchemaField('eng_txt', 'STRING', mode='REQUIRED',
  44. description='Output of preprocessing pipeline.')]
  45. table = bigquery.Table(table_ref, schema=schema)
  46. table = bq_client.create_table(table)
  47. logging.info('table {} has been created.'.format(table.table_id))
  48. return table.table_id
  49. def exportItems2BQ(bq_client, dataset_id, table_id, case, it_raw_blob, eng_raw_blob, curated_eng_blob):
  50. """
  51. Export text data to BigQuery.
  52. Args:
  53. bq_client: BigQuery client instance -
  54. dataset_id: str -
  55. table_id: str -
  56. case: str -
  57. it_raw_blob:gcs blob object -
  58. eng_raw_blob: gcs blob object -
  59. curated_eng_blob: gcs blob object -
  60. Returns:
  61. Logging completion
  62. """
  63. # Prepares a reference to the dataset
  64. dataset_ref = bq_client.dataset(dataset_id)
  65. table_ref = dataset_ref.table(table_id)
  66. table = bq_client.get_table(table_ref) # API call
  67. # Download text from GCS
  68. it_raw_txt_string = it_raw_blob.download_as_string().decode('utf-8')
  69. eng_raw_txt_string = eng_raw_blob.download_as_string().decode('utf-8')
  70. curated_eng_string = curated_eng_blob.download_as_string().decode('utf-8')
  71. rows_to_insert = [{'case': case,
  72. 'it_raw_txt': it_raw_txt_string,
  73. 'eng_raw_txt': eng_raw_txt_string,
  74. 'eng_txt': curated_eng_string
  75. }]
  76. errors = bq_client.insert_rows(table, rows_to_insert) # API request
  77. assert errors == []
  78. return logging.info('{} was added to {} dataset, specifically in {} table.'.format(case,
  79. dataset_id,
  80. table_id))
  81. def constructQuery(column_lst, case_id):
  82. """
  83. Construct the query to public dataset: aketari-covid19-public.covid19.ISMIR
  84. Args:
  85. column_lst: list - ["*"] or ["column_name1", "column_name2" ...]
  86. case_id: str - Optional e.g "case1"
  87. Returns:
  88. query object
  89. """
  90. # Public dataset
  91. # project_id = 'aketari-covid19-public'
  92. # dataset_id = 'covid19'
  93. # table_id = 'ISMIR'
  94. if (len(column_lst) == 1) and column_lst[0] == "*":
  95. query = ('SELECT * FROM `aketari-covid19-public.covid19.ISMIR` '
  96. 'WHERE `case`="{}" '.format(case_id))
  97. return query
  98. else:
  99. columns_str = ", ".join(column_lst)
  100. query = ('SELECT {} FROM `aketari-covid19-public.covid19.ISMIR` '
  101. 'WHERE `case`="{}" '.format(columns_str, case_id))
  102. return query
  103. def returnQueryResults(bq_client, query):
  104. """
  105. Get results from a BigQuery query.
  106. Args:
  107. bq_client: BigQuery client instantiation -
  108. query: query object
  109. Returns:
  110. list of all rows of the query
  111. """
  112. try:
  113. query_job = bq_client.query(query)
  114. return list(query_job.result())
  115. except Exception as e:
  116. return logging.error("Error", e)
  117. def populateBQ(bq_client, storage_client, bucket_name, dataset_name, table_name):
  118. """
  119. Populate BigQuery dataset.
  120. Args:
  121. bq_client: BigQuery client instantiation -
  122. storage_client:
  123. bucket_name:
  124. dataset_name:
  125. table_name:
  126. Returns:
  127. Populated BigQuery data warehouse
  128. """
  129. try:
  130. dataset_id = bqCreateDataset(bq_client, dataset_name)
  131. logging.info("The following dataset {} was successfully created/retrieved.".format(dataset_name))
  132. except Exception as e:
  133. logging.error("An error occurred.", e)
  134. try:
  135. table_id = bqCreateTable(bq_client, dataset_id, table_name)
  136. logging.info("The following table {} was successfully created/retrieved.".format(table_name))
  137. except Exception as e:
  138. logging.error("An error occurred.", e)
  139. src_bucket = os.environ['BUCKET_NAME']
  140. dest_bucket =src_bucket
  141. gcs_source_prefix = 'raw_txt'
  142. lst_blobs = storage_client.list_blobs(bucket_or_name=src_bucket,
  143. prefix=gcs_source_prefix)
  144. for blob in lst_blobs:
  145. doc_title = blob.name.split('/')[-1].split('.txt')[0]
  146. # download as string
  147. current_bucket = storage_client.get_bucket(src_bucket)
  148. it_raw_blob = current_bucket.get_blob('raw_txt/{}.txt'.format(doc_title))
  149. # set the GCS path
  150. try:
  151. # Path in case using batch translation
  152. path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title, dest_bucket, doc_title)
  153. eng_raw_blob = storage_client.get_bucket(dest_bucket).get_blob(path_blob_eng_raw)
  154. # If the file is not present, decoding a None Type will result in an error
  155. eng_raw_txt = eng_raw_blob.download_as_string().decode('utf-8')
  156. except:
  157. # New path used for pdf update
  158. path_blob_eng_raw = 'eng_txt/{}.txt'.format(doc_title)
  159. eng_raw_blob = storage_client.get_bucket(dest_bucket).get_blob(path_blob_eng_raw)
  160. # Upload blob of interest
  161. curated_eng_blob = storage_client.get_bucket(bucket_name) \
  162. .get_blob('curated_eng_txt/{}.txt'.format(doc_title))
  163. # populate to BQ dataset
  164. exportItems2BQ(bq_client, dataset_id, table_id, doc_title, it_raw_blob, eng_raw_blob, curated_eng_blob)