bq_fcn.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. from google.cloud import bigquery
  2. import logging
  3. def bqCreateDataset(bq_client, dataset_name):
  4. """
  5. Creates a dataset on Google Cloud Platform.
  6. Args:
  7. dataset_name: str - Name of the dataset
  8. Returns:
  9. dataset_id: str - Reference id for the dataset just created
  10. """
  11. dataset_ref = bq_client.dataset(dataset_name)
  12. try:
  13. dataset_id = bq_client.get_dataset(dataset_ref).dataset_id
  14. logging.warning('This dataset name: {} is already used.'.format(dataset_id))
  15. return dataset_id
  16. except:
  17. dataset = bigquery.Dataset(dataset_ref)
  18. dataset = bq_client.create_dataset(dataset)
  19. logging.info('Dataset {} created.'.format(dataset.dataset_id))
  20. return dataset.dataset_id
  21. def bqCreateTable(bq_client, dataset_id, table_name):
  22. """
  23. Create main table with all cases and the medical text.
  24. Args:
  25. dataset_id: str - Reference id for the dataset to use
  26. table_name: str - Name of the table to create
  27. Returns:
  28. table_id: str - Reference id for the table just created
  29. """
  30. dataset_ref = bq_client.dataset(dataset_id)
  31. # Prepares a reference to the table
  32. table_ref = dataset_ref.table(table_name)
  33. try:
  34. return bq_client.get_table(table_ref).table_id
  35. except:
  36. schema = [
  37. bigquery.SchemaField('case', 'STRING', mode='REQUIRED'),
  38. bigquery.SchemaField('it_raw_txt', 'STRING', mode='REQUIRED'),
  39. bigquery.SchemaField('eng_raw_txt', 'STRING', mode='REQUIRED'),
  40. bigquery.SchemaField('eng_txt', 'STRING', mode='REQUIRED',
  41. description='Output of preprocessing pipeline.')]
  42. table = bigquery.Table(table_ref, schema=schema)
  43. table = bq_client.create_table(table)
  44. logging.info('table {} has been created.'.format(table.table_id))
  45. return table.table_id
  46. def exportItems2BQ(bq_client, dataset_id, table_id, case, it_raw_blob, eng_raw_blob, curated_eng_blob):
  47. """
  48. Export text data to BigQuery.
  49. Args:
  50. dataset_id:
  51. table_id:
  52. case:
  53. it_raw_blob:
  54. eng_raw_blob:
  55. curated_eng_blob:
  56. Returns:
  57. """
  58. # Prepares a reference to the dataset
  59. dataset_ref = bq_client.dataset(dataset_id)
  60. table_ref = dataset_ref.table(table_id)
  61. table = bq_client.get_table(table_ref) # API call
  62. # Download text from GCS
  63. it_raw_txt_string = it_raw_blob.download_as_string().decode('utf-8')
  64. eng_raw_txt_string = eng_raw_blob.download_as_string().decode('utf-8')
  65. curated_eng_string = curated_eng_blob.download_as_string().decode('utf-8')
  66. rows_to_insert = [{'case': case,
  67. 'it_raw_txt': it_raw_txt_string,
  68. 'eng_raw_txt': eng_raw_txt_string,
  69. 'eng_txt': curated_eng_string
  70. }]
  71. errors = bq_client.insert_rows(table, rows_to_insert) # API request
  72. assert errors == []
  73. logging.info('{} was added to {} dataset, specifically in {} table.'.format(case,
  74. dataset_id,
  75. table_id))
  76. def returnQueryResults(bq_client, project_id, dataset_id, table_id, case_id):
  77. """
  78. Get results from a BigQuery query.
  79. Args:
  80. bq_client:
  81. project_id:
  82. dataset_id:
  83. table_id:
  84. case_id:
  85. Returns:
  86. """
  87. query = ('SELECT * FROM `{}.{}.{}` WHERE `case`="{}" LIMIT 1'.format(project_id, dataset_id, table_id, case_id))
  88. try:
  89. query_job = bq_client.query(query)
  90. is_exist = len(list(query_job.result())) >= 1
  91. logging.info('Query case id: {}'.format(case_id) if is_exist \
  92. else "Case id: {} does NOT exist".format(case_id))
  93. logging.info(list(query_job.result()))
  94. except Exception as e:
  95. logging.error("Error", e)