bq_fcn.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. from google.cloud import bigquery
  2. import logging
  3. def bqCreateDataset(bq_client, dataset_name):
  4. """
  5. Creates a dataset on Google Cloud Platform.
  6. Args:
  7. bq_client - BigQuery client instantiation -
  8. dataset_name: str - Name of the dataset
  9. Returns:
  10. dataset_id: str - Reference id for the dataset just created
  11. """
  12. dataset_ref = bq_client.dataset(dataset_name)
  13. try:
  14. dataset_id = bq_client.get_dataset(dataset_ref).dataset_id
  15. logging.warning('This dataset name: {} is already used.'.format(dataset_id))
  16. return dataset_id
  17. except:
  18. dataset = bigquery.Dataset(dataset_ref)
  19. dataset = bq_client.create_dataset(dataset)
  20. logging.info('Dataset {} created.'.format(dataset.dataset_id))
  21. return dataset.dataset_id
  22. def bqCreateTable(bq_client, dataset_id, table_name):
  23. """
  24. Create main table with all cases and the medical text.
  25. Args:
  26. bq_client: BigQuery client instantiation -
  27. dataset_id: str - Reference id for the dataset to use
  28. table_name: str - Name of the table to create
  29. Returns:
  30. table_id: str - Reference id for the table just created
  31. """
  32. dataset_ref = bq_client.dataset(dataset_id)
  33. # Prepares a reference to the table
  34. table_ref = dataset_ref.table(table_name)
  35. try:
  36. return bq_client.get_table(table_ref).table_id
  37. except:
  38. schema = [
  39. bigquery.SchemaField('case', 'STRING', mode='REQUIRED'),
  40. bigquery.SchemaField('it_raw_txt', 'STRING', mode='REQUIRED'),
  41. bigquery.SchemaField('eng_raw_txt', 'STRING', mode='REQUIRED'),
  42. bigquery.SchemaField('eng_txt', 'STRING', mode='REQUIRED',
  43. description='Output of preprocessing pipeline.')]
  44. table = bigquery.Table(table_ref, schema=schema)
  45. table = bq_client.create_table(table)
  46. logging.info('table {} has been created.'.format(table.table_id))
  47. return table.table_id
  48. def exportItems2BQ(bq_client, dataset_id, table_id, case, it_raw_blob, eng_raw_blob, curated_eng_blob):
  49. """
  50. Export text data to BigQuery.
  51. Args:
  52. bq_client: BigQuery client instance -
  53. dataset_id: str -
  54. table_id: str -
  55. case: str -
  56. it_raw_blob:gcs blob object -
  57. eng_raw_blob: gcs blob object -
  58. curated_eng_blob: gcs blob object -
  59. Returns:
  60. Logging completion
  61. """
  62. # Prepares a reference to the dataset
  63. dataset_ref = bq_client.dataset(dataset_id)
  64. table_ref = dataset_ref.table(table_id)
  65. table = bq_client.get_table(table_ref) # API call
  66. # Download text from GCS
  67. it_raw_txt_string = it_raw_blob.download_as_string().decode('utf-8')
  68. eng_raw_txt_string = eng_raw_blob.download_as_string().decode('utf-8')
  69. curated_eng_string = curated_eng_blob.download_as_string().decode('utf-8')
  70. rows_to_insert = [{'case': case,
  71. 'it_raw_txt': it_raw_txt_string,
  72. 'eng_raw_txt': eng_raw_txt_string,
  73. 'eng_txt': curated_eng_string
  74. }]
  75. errors = bq_client.insert_rows(table, rows_to_insert) # API request
  76. assert errors == []
  77. return logging.info('{} was added to {} dataset, specifically in {} table.'.format(case,
  78. dataset_id,
  79. table_id))
  80. def constructQuery(column_lst, case_id):
  81. """
  82. Construct the query to public dataset: aketari-covid19-public.covid19.ISMIR
  83. Args:
  84. column_lst: list - ["*"] or ["column_name1", "column_name2" ...]
  85. case_id: str - Optional e.g "case1"
  86. Returns:
  87. query object
  88. """
  89. # Public dataset
  90. # project_id = 'aketari-covid19-public'
  91. # dataset_id = 'covid19'
  92. # table_id = 'ISMIR'
  93. if (len(column_lst) == 1) and column_lst[0] == "*":
  94. query = ('SELECT * FROM `aketari-covid19-public.covid19.ISMIR` '
  95. 'WHERE `case`="{}" '.format(case_id))
  96. return query
  97. else:
  98. columns_str = ", ".join(column_lst)
  99. query = ('SELECT {} FROM `aketari-covid19-public.covid19.ISMIR` '
  100. 'WHERE `case`="{}" '.format(columns_str, case_id))
  101. return query
  102. def returnQueryResults(bq_client, query):
  103. """
  104. Get results from a BigQuery query.
  105. Args:
  106. bq_client: BigQuery client instantiation -
  107. query: query object
  108. Returns:
  109. list of all rows of the query
  110. """
  111. try:
  112. query_job = bq_client.query(query)
  113. return list(query_job.result())
  114. except Exception as e:
  115. return logging.error("Error", e)
  116. def populateBQ(bq_client, storage_client, bucket_name, dataset_name, table_name):
  117. """
  118. Populate BigQuery dataset.
  119. Args:
  120. bq_client: BigQuery client instantiation -
  121. storage_client:
  122. bucket_name:
  123. dataset_name:
  124. table_name:
  125. Returns:
  126. Populated BigQuery data warehouse
  127. """
  128. try:
  129. dataset_id = bqCreateDataset(bq_client, dataset_name)
  130. logging.info("The following dataset {} was successfully created/retrieved.".format(dataset_name))
  131. except Exception as e:
  132. logging.error("An error occurred.", e)
  133. try:
  134. table_id = bqCreateTable(bq_client, dataset_id, table_name)
  135. logging.info("The following table {} was successfully created/retrieved.".format(table_name))
  136. except Exception as e:
  137. logging.error("An error occurred.", e)
  138. gcs_source_prefix = 'raw_txt'
  139. lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
  140. prefix=gcs_source_prefix)
  141. for blob in lst_blobs:
  142. doc_title = blob.name.split('/')[-1].split('.txt')[0]
  143. # download as string
  144. it_raw_blob = storage_client.get_bucket(bucket_name).get_blob('raw_txt/{}.txt'.format(doc_title))
  145. # set the GCS path
  146. path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title, bucket_name, doc_title)
  147. eng_raw_blob = storage_client.get_bucket(bucket_name).get_blob(path_blob_eng_raw)
  148. # Upload blob of interest
  149. curated_eng_blob = storage_client.get_bucket(bucket_name) \
  150. .get_blob('curated_eng_txt/{}.txt'.format(doc_title))
  151. # populate to BQ dataset
  152. exportItems2BQ(bq_client, dataset_id, table_id, doc_title, it_raw_blob, eng_raw_blob, curated_eng_blob)